diff options
author | Louis Vézina <[email protected]> | 2020-06-10 12:04:54 -0400 |
---|---|---|
committer | Louis Vézina <[email protected]> | 2020-06-10 12:04:54 -0400 |
commit | c6548c06b7bb769af656d1eb18cc12e108260990 (patch) | |
tree | c99c6bf789f9c94d0776215ef205dc26564f310d /libs/tqdm | |
parent | f79faaa5c53306a37ee47f3c1725268c855a8f3d (diff) | |
download | bazarr-c6548c06b7bb769af656d1eb18cc12e108260990.tar.gz bazarr-c6548c06b7bb769af656d1eb18cc12e108260990.zip |
Subsync first implementation (only after download/upload).
Diffstat (limited to 'libs/tqdm')
35 files changed, 7038 insertions, 0 deletions
diff --git a/libs/tqdm/__init__.py b/libs/tqdm/__init__.py new file mode 100644 index 000000000..670d6457e --- /dev/null +++ b/libs/tqdm/__init__.py @@ -0,0 +1,40 @@ +from .std import tqdm, trange +from .gui import tqdm as tqdm_gui # TODO: remove in v5.0.0 +from .gui import trange as tgrange # TODO: remove in v5.0.0 +from ._tqdm_pandas import tqdm_pandas +from .cli import main # TODO: remove in v5.0.0 +from ._monitor import TMonitor, TqdmSynchronisationWarning +from ._version import __version__ # NOQA +from .std import TqdmTypeError, TqdmKeyError, TqdmWarning, \ + TqdmDeprecationWarning, TqdmExperimentalWarning, \ + TqdmMonitorWarning + +__all__ = ['tqdm', 'tqdm_gui', 'trange', 'tgrange', 'tqdm_pandas', + 'tqdm_notebook', 'tnrange', 'main', 'TMonitor', + 'TqdmTypeError', 'TqdmKeyError', + 'TqdmWarning', 'TqdmDeprecationWarning', + 'TqdmExperimentalWarning', + 'TqdmMonitorWarning', 'TqdmSynchronisationWarning', + '__version__'] + + +def tqdm_notebook(*args, **kwargs): # pragma: no cover + """See tqdm.notebook.tqdm for full documentation""" + from .notebook import tqdm as _tqdm_notebook + from warnings import warn + warn("This function will be removed in tqdm==5.0.0\n" + "Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`", + TqdmDeprecationWarning, stacklevel=2) + return _tqdm_notebook(*args, **kwargs) + + +def tnrange(*args, **kwargs): # pragma: no cover + """ + A shortcut for `tqdm.notebook.tqdm(xrange(*args), **kwargs)`. + On Python3+, `range` is used instead of `xrange`. + """ + from .notebook import trange as _tnrange + from warnings import warn + warn("Please use `tqdm.notebook.trange` instead of `tqdm.tnrange`", + TqdmDeprecationWarning, stacklevel=2) + return _tnrange(*args, **kwargs) diff --git a/libs/tqdm/__main__.py b/libs/tqdm/__main__.py new file mode 100644 index 000000000..130bc6375 --- /dev/null +++ b/libs/tqdm/__main__.py @@ -0,0 +1,2 @@ +from .cli import main +main() diff --git a/libs/tqdm/_main.py b/libs/tqdm/_main.py new file mode 100644 index 000000000..07b6730b1 --- /dev/null +++ b/libs/tqdm/_main.py @@ -0,0 +1,7 @@ +from .cli import * # NOQA +from .cli import __all__ # NOQA +from .std import TqdmDeprecationWarning +from warnings import warn +warn("This function will be removed in tqdm==5.0.0\n" + "Please use `tqdm.cli.*` instead of `tqdm._main.*`", + TqdmDeprecationWarning, stacklevel=2) diff --git a/libs/tqdm/_monitor.py b/libs/tqdm/_monitor.py new file mode 100644 index 000000000..e1e257069 --- /dev/null +++ b/libs/tqdm/_monitor.py @@ -0,0 +1,99 @@ +from threading import Event, Thread, current_thread +from time import time +from warnings import warn +import atexit +__all__ = ["TMonitor", "TqdmSynchronisationWarning"] + + +class TqdmSynchronisationWarning(RuntimeWarning): + """tqdm multi-thread/-process errors which may cause incorrect nesting + but otherwise no adverse effects""" + pass + + +class TMonitor(Thread): + """ + Monitoring thread for tqdm bars. + Monitors if tqdm bars are taking too much time to display + and readjusts miniters automatically if necessary. + + Parameters + ---------- + tqdm_cls : class + tqdm class to use (can be core tqdm or a submodule). + sleep_interval : fload + Time to sleep between monitoring checks. + """ + + # internal vars for unit testing + _time = None + _event = None + + def __init__(self, tqdm_cls, sleep_interval): + Thread.__init__(self) + self.daemon = True # kill thread when main killed (KeyboardInterrupt) + self.was_killed = Event() + self.woken = 0 # last time woken up, to sync with monitor + self.tqdm_cls = tqdm_cls + self.sleep_interval = sleep_interval + if TMonitor._time is not None: + self._time = TMonitor._time + else: + self._time = time + if TMonitor._event is not None: + self._event = TMonitor._event + else: + self._event = Event + atexit.register(self.exit) + self.start() + + def exit(self): + self.was_killed.set() + if self is not current_thread(): + self.join() + return self.report() + + def get_instances(self): + # returns a copy of started `tqdm_cls` instances + return [i for i in self.tqdm_cls._instances.copy() + # Avoid race by checking that the instance started + if hasattr(i, 'start_t')] + + def run(self): + cur_t = self._time() + while True: + # After processing and before sleeping, notify that we woke + # Need to be done just before sleeping + self.woken = cur_t + # Sleep some time... + self.was_killed.wait(self.sleep_interval) + # Quit if killed + if self.was_killed.is_set(): + return + # Then monitor! + # Acquire lock (to access _instances) + with self.tqdm_cls.get_lock(): + cur_t = self._time() + # Check tqdm instances are waiting too long to print + instances = self.get_instances() + for instance in instances: + # Check event in loop to reduce blocking time on exit + if self.was_killed.is_set(): + return + # Only if mininterval > 1 (else iterations are just slow) + # and last refresh exceeded maxinterval + if instance.miniters > 1 and \ + (cur_t - instance.last_print_t) >= \ + instance.maxinterval: + # force bypassing miniters on next iteration + # (dynamic_miniters adjusts mininterval automatically) + instance.miniters = 1 + # Refresh now! (works only for manual tqdm) + instance.refresh(nolock=True) + if instances != self.get_instances(): # pragma: nocover + warn("Set changed size during iteration" + + " (see https://github.com/tqdm/tqdm/issues/481)", + TqdmSynchronisationWarning, stacklevel=2) + + def report(self): + return not self.was_killed.is_set() diff --git a/libs/tqdm/_tqdm.py b/libs/tqdm/_tqdm.py new file mode 100644 index 000000000..694318ee7 --- /dev/null +++ b/libs/tqdm/_tqdm.py @@ -0,0 +1,7 @@ +from .std import * # NOQA +from .std import __all__ # NOQA +from .std import TqdmDeprecationWarning +from warnings import warn +warn("This function will be removed in tqdm==5.0.0\n" + "Please use `tqdm.std.*` instead of `tqdm._tqdm.*`", + TqdmDeprecationWarning, stacklevel=2) diff --git a/libs/tqdm/_tqdm_gui.py b/libs/tqdm/_tqdm_gui.py new file mode 100644 index 000000000..541f104fb --- /dev/null +++ b/libs/tqdm/_tqdm_gui.py @@ -0,0 +1,7 @@ +from .gui import * # NOQA +from .gui import __all__ # NOQA +from .std import TqdmDeprecationWarning +from warnings import warn +warn("This function will be removed in tqdm==5.0.0\n" + "Please use `tqdm.gui.*` instead of `tqdm._tqdm_gui.*`", + TqdmDeprecationWarning, stacklevel=2) diff --git a/libs/tqdm/_tqdm_notebook.py b/libs/tqdm/_tqdm_notebook.py new file mode 100644 index 000000000..dde999817 --- /dev/null +++ b/libs/tqdm/_tqdm_notebook.py @@ -0,0 +1,7 @@ +from .notebook import * # NOQA +from .notebook import __all__ # NOQA +from .std import TqdmDeprecationWarning +from warnings import warn +warn("This function will be removed in tqdm==5.0.0\n" + "Please use `tqdm.notebook.*` instead of `tqdm._tqdm_notebook.*`", + TqdmDeprecationWarning, stacklevel=2) diff --git a/libs/tqdm/_tqdm_pandas.py b/libs/tqdm/_tqdm_pandas.py new file mode 100644 index 000000000..234fafffe --- /dev/null +++ b/libs/tqdm/_tqdm_pandas.py @@ -0,0 +1,46 @@ +import sys + +__author__ = "github.com/casperdcl" +__all__ = ['tqdm_pandas'] + + +def tqdm_pandas(tclass, *targs, **tkwargs): + """ + Registers the given `tqdm` instance with + `pandas.core.groupby.DataFrameGroupBy.progress_apply`. + It will even close() the `tqdm` instance upon completion. + + Parameters + ---------- + tclass : tqdm class you want to use (eg, tqdm, tqdm_notebook, etc) + targs and tkwargs : arguments for the tqdm instance + + Examples + -------- + >>> import pandas as pd + >>> import numpy as np + >>> from tqdm import tqdm, tqdm_pandas + >>> + >>> df = pd.DataFrame(np.random.randint(0, 100, (100000, 6))) + >>> tqdm_pandas(tqdm, leave=True) # can use tqdm_gui, optional kwargs, etc + >>> # Now you can use `progress_apply` instead of `apply` + >>> df.groupby(0).progress_apply(lambda x: x**2) + + References + ---------- + https://stackoverflow.com/questions/18603270/ + progress-indicator-during-pandas-operations-python + """ + from tqdm import TqdmDeprecationWarning + + if isinstance(tclass, type) or (getattr(tclass, '__name__', '').startswith( + 'tqdm_')): # delayed adapter case + TqdmDeprecationWarning("""\ +Please use `tqdm.pandas(...)` instead of `tqdm_pandas(tqdm, ...)`. +""", fp_write=getattr(tkwargs.get('file', None), 'write', sys.stderr.write)) + tclass.pandas(*targs, **tkwargs) + else: + TqdmDeprecationWarning("""\ +Please use `tqdm.pandas(...)` instead of `tqdm_pandas(tqdm(...))`. +""", fp_write=getattr(tclass.fp, 'write', sys.stderr.write)) + type(tclass).pandas(deprecated_t=tclass) diff --git a/libs/tqdm/_utils.py b/libs/tqdm/_utils.py new file mode 100644 index 000000000..084327f47 --- /dev/null +++ b/libs/tqdm/_utils.py @@ -0,0 +1,6 @@ +from .utils import CUR_OS, IS_WIN, IS_NIX, RE_ANSI, _range, _unich, _unicode, colorama, WeakSet, _basestring, _OrderedDict, FormatReplace, Comparable, SimpleTextIOWrapper, _is_utf, _supports_unicode, _is_ascii, _screen_shape_wrapper, _screen_shape_windows, _screen_shape_tput, _screen_shape_linux, _environ_cols_wrapper, _term_move_up # NOQA +from .std import TqdmDeprecationWarning +from warnings import warn +warn("This function will be removed in tqdm==5.0.0\n" + "Please use `tqdm.utils.*` instead of `tqdm._utils.*`", + TqdmDeprecationWarning, stacklevel=2) diff --git a/libs/tqdm/_version.py b/libs/tqdm/_version.py new file mode 100644 index 000000000..26606cb03 --- /dev/null +++ b/libs/tqdm/_version.py @@ -0,0 +1,59 @@ +# Definition of the version number +import os +from io import open as io_open + +__all__ = ["__version__"] + +# major, minor, patch, -extra +version_info = 4, 46, 1 + +# Nice string for the version +__version__ = '.'.join(map(str, version_info)) + + +# auto -extra based on commit hash (if not tagged as release) +scriptdir = os.path.dirname(__file__) +gitdir = os.path.abspath(os.path.join(scriptdir, "..", ".git")) +if os.path.isdir(gitdir): # pragma: nocover + extra = None + # Open config file to check if we are in tqdm project + with io_open(os.path.join(gitdir, "config"), 'r') as fh_config: + if 'tqdm' in fh_config.read(): + # Open the HEAD file + with io_open(os.path.join(gitdir, "HEAD"), 'r') as fh_head: + extra = fh_head.readline().strip() + # in a branch => HEAD points to file containing last commit + if 'ref:' in extra: + # reference file path + ref_file = extra[5:] + branch_name = ref_file.rsplit('/', 1)[-1] + + ref_file_path = os.path.abspath(os.path.join(gitdir, ref_file)) + # check that we are in git folder + # (by stripping the git folder from the ref file path) + if os.path.relpath( + ref_file_path, gitdir).replace('\\', '/') != ref_file: + # out of git folder + extra = None + else: + # open the ref file + with io_open(ref_file_path, 'r') as fh_branch: + commit_hash = fh_branch.readline().strip() + extra = commit_hash[:8] + if branch_name != "master": + extra += '.' + branch_name + + # detached HEAD mode, already have commit hash + else: + extra = extra[:8] + + # Append commit hash (and branch) to version string if not tagged + if extra is not None: + try: + with io_open(os.path.join(gitdir, "refs", "tags", + 'v' + __version__)) as fdv: + if fdv.readline().strip()[:8] != extra[:8]: + __version__ += '-' + extra + except Exception as e: + if "No such file" not in str(e): + raise diff --git a/libs/tqdm/auto.py b/libs/tqdm/auto.py new file mode 100644 index 000000000..4dd171754 --- /dev/null +++ b/libs/tqdm/auto.py @@ -0,0 +1,6 @@ +import warnings +from .std import TqdmExperimentalWarning +with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=TqdmExperimentalWarning) + from .autonotebook import tqdm, trange +__all__ = ["tqdm", "trange"] diff --git a/libs/tqdm/autonotebook.py b/libs/tqdm/autonotebook.py new file mode 100644 index 000000000..0bcd42a13 --- /dev/null +++ b/libs/tqdm/autonotebook.py @@ -0,0 +1,18 @@ +import os + +try: + from IPython import get_ipython + if 'IPKernelApp' not in get_ipython().config: # pragma: no cover + raise ImportError("console") + if 'VSCODE_PID' in os.environ: # pragma: no cover + raise ImportError("vscode") +except: + from .std import tqdm, trange +else: # pragma: no cover + from .notebook import tqdm, trange + from .std import TqdmExperimentalWarning + from warnings import warn + warn("Using `tqdm.autonotebook.tqdm` in notebook mode." + " Use `tqdm.tqdm` instead to force console mode" + " (e.g. in jupyter console)", TqdmExperimentalWarning, stacklevel=2) +__all__ = ["tqdm", "trange"] diff --git a/libs/tqdm/cli.py b/libs/tqdm/cli.py new file mode 100644 index 000000000..bf1cddeba --- /dev/null +++ b/libs/tqdm/cli.py @@ -0,0 +1,239 @@ +from .std import tqdm, TqdmTypeError, TqdmKeyError +from ._version import __version__ # NOQA +import sys +import re +import logging +__all__ = ["main"] + + +def cast(val, typ): + log = logging.getLogger(__name__) + log.debug((val, typ)) + if " or " in typ: + for t in typ.split(" or "): + try: + return cast(val, t) + except TqdmTypeError: + pass + raise TqdmTypeError(val + ' : ' + typ) + + # sys.stderr.write('\ndebug | `val:type`: `' + val + ':' + typ + '`.\n') + if typ == 'bool': + if (val == 'True') or (val == ''): + return True + elif val == 'False': + return False + else: + raise TqdmTypeError(val + ' : ' + typ) + try: + return eval(typ + '("' + val + '")') + except: + if typ == 'chr': + return chr(ord(eval('"' + val + '"'))) + else: + raise TqdmTypeError(val + ' : ' + typ) + + +def posix_pipe(fin, fout, delim='\n', buf_size=256, + callback=lambda int: None # pragma: no cover + ): + """ + Params + ------ + fin : file with `read(buf_size : int)` method + fout : file with `write` (and optionally `flush`) methods. + callback : function(int), e.g.: `tqdm.update` + """ + fp_write = fout.write + + # tmp = '' + if not delim: + while True: + tmp = fin.read(buf_size) + + # flush at EOF + if not tmp: + getattr(fout, 'flush', lambda: None)() # pragma: no cover + return + + fp_write(tmp) + callback(len(tmp)) + # return + + buf = '' + # n = 0 + while True: + tmp = fin.read(buf_size) + + # flush at EOF + if not tmp: + if buf: + fp_write(buf) + callback(1 + buf.count(delim)) # n += 1 + buf.count(delim) + getattr(fout, 'flush', lambda: None)() # pragma: no cover + return # n + + while True: + try: + i = tmp.index(delim) + except ValueError: + buf += tmp + break + else: + fp_write(buf + tmp[:i + len(delim)]) + callback(1) # n += 1 + buf = '' + tmp = tmp[i + len(delim):] + + +# ((opt, type), ... ) +RE_OPTS = re.compile(r'\n {8}(\S+)\s{2,}:\s*([^,]+)') +# better split method assuming no positional args +RE_SHLEX = re.compile(r'\s*(?<!\S)--?([^\s=]+)(\s+|=|$)') + +# TODO: add custom support for some of the following? +UNSUPPORTED_OPTS = ('iterable', 'gui', 'out', 'file') + +# The 8 leading spaces are required for consistency +CLI_EXTRA_DOC = r""" + Extra CLI Options + ----------------- + name : type, optional + TODO: find out why this is needed. + delim : chr, optional + Delimiting character [default: '\n']. Use '\0' for null. + N.B.: on Windows systems, Python converts '\n' to '\r\n'. + buf_size : int, optional + String buffer size in bytes [default: 256] + used when `delim` is specified. + bytes : bool, optional + If true, will count bytes, ignore `delim`, and default + `unit_scale` to True, `unit_divisor` to 1024, and `unit` to 'B'. + manpath : str, optional + Directory in which to install tqdm man pages. + comppath : str, optional + Directory in which to place tqdm completion. + log : str, optional + CRITICAL|FATAL|ERROR|WARN(ING)|[default: 'INFO']|DEBUG|NOTSET. +""" + + +def main(fp=sys.stderr, argv=None): + """ + Parameters (internal use only) + --------- + fp : file-like object for tqdm + argv : list (default: sys.argv[1:]) + """ + if argv is None: + argv = sys.argv[1:] + try: + log = argv.index('--log') + except ValueError: + for i in argv: + if i.startswith('--log='): + logLevel = i[len('--log='):] + break + else: + logLevel = 'INFO' + else: + # argv.pop(log) + # logLevel = argv.pop(log) + logLevel = argv[log + 1] + logging.basicConfig( + level=getattr(logging, logLevel), + format="%(levelname)s:%(module)s:%(lineno)d:%(message)s") + log = logging.getLogger(__name__) + + d = tqdm.__init__.__doc__ + CLI_EXTRA_DOC + + opt_types = dict(RE_OPTS.findall(d)) + # opt_types['delim'] = 'chr' + + for o in UNSUPPORTED_OPTS: + opt_types.pop(o) + + log.debug(sorted(opt_types.items())) + + # d = RE_OPTS.sub(r' --\1=<\1> : \2', d) + split = RE_OPTS.split(d) + opt_types_desc = zip(split[1::3], split[2::3], split[3::3]) + d = ''.join('\n --{0}=<{0}> : {1}{2}'.format(*otd) + for otd in opt_types_desc if otd[0] not in UNSUPPORTED_OPTS) + + d = """Usage: + tqdm [--help | options] + +Options: + -h, --help Print this help and exit + -v, --version Print version and exit + +""" + d.strip('\n') + '\n' + + # opts = docopt(d, version=__version__) + if any(v in argv for v in ('-v', '--version')): + sys.stdout.write(__version__ + '\n') + sys.exit(0) + elif any(v in argv for v in ('-h', '--help')): + sys.stdout.write(d + '\n') + sys.exit(0) + + argv = RE_SHLEX.split(' '.join(["tqdm"] + argv)) + opts = dict(zip(argv[1::3], argv[3::3])) + + log.debug(opts) + opts.pop('log', True) + + tqdm_args = {'file': fp} + try: + for (o, v) in opts.items(): + try: + tqdm_args[o] = cast(v, opt_types[o]) + except KeyError as e: + raise TqdmKeyError(str(e)) + log.debug('args:' + str(tqdm_args)) + except: + fp.write('\nError:\nUsage:\n tqdm [--help | options]\n') + for i in sys.stdin: + sys.stdout.write(i) + raise + else: + buf_size = tqdm_args.pop('buf_size', 256) + delim = tqdm_args.pop('delim', '\n') + delim_per_char = tqdm_args.pop('bytes', False) + manpath = tqdm_args.pop('manpath', None) + comppath = tqdm_args.pop('comppath', None) + stdin = getattr(sys.stdin, 'buffer', sys.stdin) + stdout = getattr(sys.stdout, 'buffer', sys.stdout) + if manpath or comppath: + from os import path + from shutil import copyfile + from pkg_resources import resource_filename, Requirement + + def cp(src, dst): + """copies from src path to dst""" + copyfile(src, dst) + log.info("written:" + dst) + if manpath is not None: + cp(resource_filename(Requirement.parse('tqdm'), 'tqdm/tqdm.1'), + path.join(manpath, 'tqdm.1')) + if comppath is not None: + cp(resource_filename(Requirement.parse('tqdm'), + 'tqdm/completion.sh'), + path.join(comppath, 'tqdm_completion.sh')) + sys.exit(0) + if delim_per_char: + tqdm_args.setdefault('unit', 'B') + tqdm_args.setdefault('unit_scale', True) + tqdm_args.setdefault('unit_divisor', 1024) + log.debug(tqdm_args) + with tqdm(**tqdm_args) as t: + posix_pipe(stdin, stdout, '', buf_size, t.update) + elif delim == '\n': + log.debug(tqdm_args) + for i in tqdm(stdin, **tqdm_args): + stdout.write(i) + else: + log.debug(tqdm_args) + with tqdm(**tqdm_args) as t: + posix_pipe(stdin, stdout, delim, buf_size, t.update) diff --git a/libs/tqdm/completion.sh b/libs/tqdm/completion.sh new file mode 100644 index 000000000..fabd3f2d3 --- /dev/null +++ b/libs/tqdm/completion.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +_tqdm(){ + local cur prv + cur="${COMP_WORDS[COMP_CWORD]}" + prv="${COMP_WORDS[COMP_CWORD - 1]}" + + case ${prv} in + --bar_format|--buf_size|--comppath|--delim|--desc|--initial|--lock_args|--manpath|--maxinterval|--mininterval|--miniters|--ncols|--nrows|--position|--postfix|--smoothing|--total|--unit|--unit_divisor) + # await user input + ;; + "--log") + COMPREPLY=($(compgen -W 'CRITICAL FATAL ERROR WARN WARNING INFO DEBUG NOTSET' -- ${cur})) + ;; + *) + COMPREPLY=($(compgen -W '--ascii --bar_format --buf_size --bytes --comppath --delim --desc --disable --dynamic_ncols --help --initial --leave --lock_args --log --manpath --maxinterval --mininterval --miniters --ncols --nrows --position --postfix --smoothing --total --unit --unit_divisor --unit_scale --version --write_bytes -h -v' -- ${cur})) + ;; + esac +} +complete -F _tqdm tqdm diff --git a/libs/tqdm/contrib/__init__.py b/libs/tqdm/contrib/__init__.py new file mode 100644 index 000000000..01312cd05 --- /dev/null +++ b/libs/tqdm/contrib/__init__.py @@ -0,0 +1,80 @@ +""" +Thin wrappers around common functions. + +Subpackages contain potentially unstable extensions. +""" +from tqdm import tqdm +from tqdm.auto import tqdm as tqdm_auto +from tqdm.utils import ObjectWrapper +from copy import deepcopy +import functools +import sys +__author__ = {"github.com/": ["casperdcl"]} +__all__ = ['tenumerate', 'tzip', 'tmap'] + + +class DummyTqdmFile(ObjectWrapper): + """Dummy file-like that will write to tqdm""" + def write(self, x, nolock=False): + # Avoid print() second call (useless \n) + if len(x.rstrip()) > 0: + tqdm.write(x, file=self._wrapped, nolock=nolock) + + +def tenumerate(iterable, start=0, total=None, tqdm_class=tqdm_auto, + **tqdm_kwargs): + """ + Equivalent of `numpy.ndenumerate` or builtin `enumerate`. + + Parameters + ---------- + tqdm_class : [default: tqdm.auto.tqdm]. + """ + try: + import numpy as np + except ImportError: + pass + else: + if isinstance(iterable, np.ndarray): + return tqdm_class(np.ndenumerate(iterable), + total=total or iterable.size, **tqdm_kwargs) + return enumerate(tqdm_class(iterable, **tqdm_kwargs), start) + + +def _tzip(iter1, *iter2plus, **tqdm_kwargs): + """ + Equivalent of builtin `zip`. + + Parameters + ---------- + tqdm_class : [default: tqdm.auto.tqdm]. + """ + kwargs = deepcopy(tqdm_kwargs) + tqdm_class = kwargs.pop("tqdm_class", tqdm_auto) + for i in zip(tqdm_class(iter1, **tqdm_kwargs), *iter2plus): + yield i + + +def _tmap(function, *sequences, **tqdm_kwargs): + """ + Equivalent of builtin `map`. + + Parameters + ---------- + tqdm_class : [default: tqdm.auto.tqdm]. + """ + for i in _tzip(*sequences, **tqdm_kwargs): + yield function(*i) + + +if sys.version_info[:1] < (3,): + @functools.wraps(_tzip) + def tzip(*args, **kwargs): + return list(_tzip(*args, **kwargs)) + + @functools.wraps(_tmap) + def tmap(*args, **kwargs): + return list(_tmap(*args, **kwargs)) +else: + tzip = _tzip + tmap = _tmap diff --git a/libs/tqdm/contrib/concurrent.py b/libs/tqdm/contrib/concurrent.py new file mode 100644 index 000000000..197a5f8c5 --- /dev/null +++ b/libs/tqdm/contrib/concurrent.py @@ -0,0 +1,106 @@ +""" +Thin wrappers around `concurrent.futures`. +""" +from __future__ import absolute_import +from tqdm import TqdmWarning +from tqdm.auto import tqdm as tqdm_auto +from copy import deepcopy +try: + from operator import length_hint +except ImportError: + def length_hint(it, default=0): + """Returns `len(it)`, falling back to `default`""" + try: + return len(it) + except TypeError: + return default +try: + from os import cpu_count +except ImportError: + try: + from multiprocessing import cpu_count + except ImportError: + def cpu_count(): + return 4 +import sys +__author__ = {"github.com/": ["casperdcl"]} +__all__ = ['thread_map', 'process_map'] + + +def _executor_map(PoolExecutor, fn, *iterables, **tqdm_kwargs): + """ + Implementation of `thread_map` and `process_map`. + + Parameters + ---------- + tqdm_class : [default: tqdm.auto.tqdm]. + max_workers : [default: min(32, cpu_count() + 4)]. + chunksize : [default: 1]. + """ + kwargs = deepcopy(tqdm_kwargs) + if "total" not in kwargs: + kwargs["total"] = len(iterables[0]) + tqdm_class = kwargs.pop("tqdm_class", tqdm_auto) + max_workers = kwargs.pop("max_workers", min(32, cpu_count() + 4)) + chunksize = kwargs.pop("chunksize", 1) + pool_kwargs = dict(max_workers=max_workers) + sys_version = sys.version_info[:2] + if sys_version >= (3, 7): + # share lock in case workers are already using `tqdm` + pool_kwargs.update( + initializer=tqdm_class.set_lock, initargs=(tqdm_class.get_lock(),)) + map_args = {} + if not (3, 0) < sys_version < (3, 5): + map_args.update(chunksize=chunksize) + with PoolExecutor(**pool_kwargs) as ex: + return list(tqdm_class( + ex.map(fn, *iterables, **map_args), **kwargs)) + + +def thread_map(fn, *iterables, **tqdm_kwargs): + """ + Equivalent of `list(map(fn, *iterables))` + driven by `concurrent.futures.ThreadPoolExecutor`. + + Parameters + ---------- + tqdm_class : optional + `tqdm` class to use for bars [default: tqdm.auto.tqdm]. + max_workers : int, optional + Maximum number of workers to spawn; passed to + `concurrent.futures.ThreadPoolExecutor.__init__`. + [default: max(32, cpu_count() + 4)]. + """ + from concurrent.futures import ThreadPoolExecutor + return _executor_map(ThreadPoolExecutor, fn, *iterables, **tqdm_kwargs) + + +def process_map(fn, *iterables, **tqdm_kwargs): + """ + Equivalent of `list(map(fn, *iterables))` + driven by `concurrent.futures.ProcessPoolExecutor`. + + Parameters + ---------- + tqdm_class : optional + `tqdm` class to use for bars [default: tqdm.auto.tqdm]. + max_workers : int, optional + Maximum number of workers to spawn; passed to + `concurrent.futures.ProcessPoolExecutor.__init__`. + [default: min(32, cpu_count() + 4)]. + chunksize : int, optional + Size of chunks sent to worker processes; passed to + `concurrent.futures.ProcessPoolExecutor.map`. [default: 1]. + """ + from concurrent.futures import ProcessPoolExecutor + if iterables and "chunksize" not in tqdm_kwargs: + # default `chunksize=1` has poor performance for large iterables + # (most time spent dispatching items to workers). + longest_iterable_len = max(map(length_hint, iterables)) + if longest_iterable_len > 1000: + from warnings import warn + warn("Iterable length %d > 1000 but `chunksize` is not set." + " This may seriously degrade multiprocess performance." + " Set `chunksize=1` or more." % longest_iterable_len, + TqdmWarning, stacklevel=2) + return _executor_map(ProcessPoolExecutor, fn, *iterables, **tqdm_kwargs) diff --git a/libs/tqdm/contrib/itertools.py b/libs/tqdm/contrib/itertools.py new file mode 100644 index 000000000..0f2a2a42b --- /dev/null +++ b/libs/tqdm/contrib/itertools.py @@ -0,0 +1,34 @@ +""" +Thin wrappers around `itertools`. +""" +from __future__ import absolute_import +from tqdm.auto import tqdm as tqdm_auto +from copy import deepcopy +import itertools +__author__ = {"github.com/": ["casperdcl"]} +__all__ = ['product'] + + +def product(*iterables, **tqdm_kwargs): + """ + Equivalent of `itertools.product`. + + Parameters + ---------- + tqdm_class : [default: tqdm.auto.tqdm]. + """ + kwargs = deepcopy(tqdm_kwargs) + tqdm_class = kwargs.pop("tqdm_class", tqdm_auto) + try: + lens = list(map(len, iterables)) + except TypeError: + total = None + else: + total = 1 + for i in lens: + total *= i + kwargs.setdefault("total", total) + with tqdm_class(**kwargs) as t: + for i in itertools.product(*iterables): + yield i + t.update() diff --git a/libs/tqdm/contrib/telegram.py b/libs/tqdm/contrib/telegram.py new file mode 100644 index 000000000..5654dc99c --- /dev/null +++ b/libs/tqdm/contrib/telegram.py @@ -0,0 +1,136 @@ +""" +Sends updates to a Telegram bot. +""" +from __future__ import absolute_import + +from concurrent.futures import ThreadPoolExecutor +from requests import Session + +from tqdm.auto import tqdm as tqdm_auto +from tqdm.utils import _range +__author__ = {"github.com/": ["casperdcl"]} +__all__ = ['TelegramIO', 'tqdm_telegram', 'ttgrange', 'tqdm', 'trange'] + + +class TelegramIO(): + """Non-blocking file-like IO to a Telegram Bot.""" + API = 'https://api.telegram.org/bot' + + def __init__(self, token, chat_id): + """Creates a new message in the given `chat_id`.""" + self.token = token + self.chat_id = chat_id + self.session = session = Session() + self.text = self.__class__.__name__ + self.pool = ThreadPoolExecutor() + self.futures = [] + try: + res = session.post( + self.API + '%s/sendMessage' % self.token, + data=dict(text='`' + self.text + '`', chat_id=self.chat_id, + parse_mode='MarkdownV2')) + except Exception as e: + tqdm_auto.write(str(e)) + else: + self.message_id = res.json()['result']['message_id'] + + def write(self, s): + """Replaces internal `message_id`'s text with `s`.""" + if not s: + return + s = s.strip().replace('\r', '') + if s == self.text: + return # avoid duplicate message Bot error + self.text = s + try: + f = self.pool.submit( + self.session.post, + self.API + '%s/editMessageText' % self.token, + data=dict( + text='`' + s + '`', chat_id=self.chat_id, + message_id=self.message_id, parse_mode='MarkdownV2')) + except Exception as e: + tqdm_auto.write(str(e)) + else: + self.futures.append(f) + return f + + def flush(self): + """Ensure the last `write` has been processed.""" + [f.cancel() for f in self.futures[-2::-1]] + try: + return self.futures[-1].result() + except IndexError: + pass + finally: + self.futures = [] + + def __del__(self): + self.flush() + + +class tqdm_telegram(tqdm_auto): + """ + Standard `tqdm.auto.tqdm` but also sends updates to a Telegram bot. + May take a few seconds to create (`__init__`) and clear (`__del__`). + + >>> from tqdm.contrib.telegram import tqdm, trange + >>> for i in tqdm( + ... iterable, + ... token='1234567890:THIS1SSOMETOKEN0BTAINeDfrOmTELEGrAM', + ... chat_id='0246813579'): + """ + def __init__(self, *args, **kwargs): + """ + Parameters + ---------- + token : str, required. Telegram token. + chat_id : str, required. Telegram chat ID. + + See `tqdm.auto.tqdm.__init__` for other parameters. + """ + self.tgio = TelegramIO(kwargs.pop('token'), kwargs.pop('chat_id')) + super(tqdm_telegram, self).__init__(*args, **kwargs) + + def display(self, **kwargs): + super(tqdm_telegram, self).display(**kwargs) + fmt = self.format_dict + if 'bar_format' in fmt and fmt['bar_format']: + fmt['bar_format'] = fmt['bar_format'].replace('<bar/>', '{bar}') + else: + fmt['bar_format'] = '{l_bar}{bar}{r_bar}' + fmt['bar_format'] = fmt['bar_format'].replace('{bar}', '{bar:10u}') + self.tgio.write(self.format_meter(**fmt)) + + def __new__(cls, *args, **kwargs): + """ + Workaround for mixed-class same-stream nested progressbars. + See [#509](https://github.com/tqdm/tqdm/issues/509) + """ + with cls.get_lock(): + try: + cls._instances = tqdm_auto._instances + except AttributeError: + pass + instance = super(tqdm_telegram, cls).__new__(cls, *args, **kwargs) + with cls.get_lock(): + try: + # `tqdm_auto` may have been changed so update + cls._instances.update(tqdm_auto._instances) + except AttributeError: + pass + tqdm_auto._instances = cls._instances + return instance + + +def ttgrange(*args, **kwargs): + """ + A shortcut for `tqdm.contrib.telegram.tqdm(xrange(*args), **kwargs)`. + On Python3+, `range` is used instead of `xrange`. + """ + return tqdm_telegram(_range(*args), **kwargs) + + +# Aliases +tqdm = tqdm_telegram +trange = ttgrange diff --git a/libs/tqdm/gui.py b/libs/tqdm/gui.py new file mode 100644 index 000000000..35f5c5e55 --- /dev/null +++ b/libs/tqdm/gui.py @@ -0,0 +1,321 @@ +""" +GUI progressbar decorator for iterators. +Includes a default (x)range iterator printing to stderr. + +Usage: + >>> from tqdm.gui import trange[, tqdm] + >>> for i in trange(10): #same as: for i in tqdm(xrange(10)) + ... ... +""" +# future division is important to divide integers and get as +# a result precise floating numbers (instead of truncated int) +from __future__ import division, absolute_import +# import compatibility functions and utilities +from .utils import _range +# to inherit from the tqdm class +from .std import tqdm as std_tqdm +from .std import TqdmExperimentalWarning +from warnings import warn + + +__author__ = {"github.com/": ["casperdcl", "lrq3000"]} +__all__ = ['tqdm_gui', 'tgrange', 'tqdm', 'trange'] + + +class tqdm_gui(std_tqdm): # pragma: no cover + """ + Experimental GUI version of tqdm! + """ + + # TODO: @classmethod: write() on GUI? + + def __init__(self, *args, **kwargs): + import matplotlib as mpl + import matplotlib.pyplot as plt + from collections import deque + kwargs['gui'] = True + + super(tqdm_gui, self).__init__(*args, **kwargs) + + # Initialize the GUI display + if self.disable or not kwargs['gui']: + return + + warn('GUI is experimental/alpha', TqdmExperimentalWarning, stacklevel=2) + self.mpl = mpl + self.plt = plt + self.sp = None + + # Remember if external environment uses toolbars + self.toolbar = self.mpl.rcParams['toolbar'] + self.mpl.rcParams['toolbar'] = 'None' + + self.mininterval = max(self.mininterval, 0.5) + self.fig, ax = plt.subplots(figsize=(9, 2.2)) + # self.fig.subplots_adjust(bottom=0.2) + total = len(self) + if total is not None: + self.xdata = [] + self.ydata = [] + self.zdata = [] + else: + self.xdata = deque([]) + self.ydata = deque([]) + self.zdata = deque([]) + self.line1, = ax.plot(self.xdata, self.ydata, color='b') + self.line2, = ax.plot(self.xdata, self.zdata, color='k') + ax.set_ylim(0, 0.001) + if total is not None: + ax.set_xlim(0, 100) + ax.set_xlabel('percent') + self.fig.legend((self.line1, self.line2), ('cur', 'est'), + loc='center right') + # progressbar + self.hspan = plt.axhspan(0, 0.001, + xmin=0, xmax=0, color='g') + else: + # ax.set_xlim(-60, 0) + ax.set_xlim(0, 60) + ax.invert_xaxis() + ax.set_xlabel('seconds') + ax.legend(('cur', 'est'), loc='lower left') + ax.grid() + # ax.set_xlabel('seconds') + ax.set_ylabel((self.unit if self.unit else 'it') + '/s') + if self.unit_scale: + plt.ticklabel_format(style='sci', axis='y', + scilimits=(0, 0)) + ax.yaxis.get_offset_text().set_x(-0.15) + + # Remember if external environment is interactive + self.wasion = plt.isinteractive() + plt.ion() + self.ax = ax + + def __iter__(self): + # TODO: somehow allow the following: + # if not self.gui: + # return super(tqdm_gui, self).__iter__() + iterable = self.iterable + if self.disable: + for obj in iterable: + yield obj + return + + # ncols = self.ncols + mininterval = self.mininterval + maxinterval = self.maxinterval + miniters = self.miniters + dynamic_miniters = self.dynamic_miniters + last_print_t = self.last_print_t + last_print_n = self.last_print_n + n = self.n + # dynamic_ncols = self.dynamic_ncols + smoothing = self.smoothing + avg_time = self.avg_time + time = self._time + + for obj in iterable: + yield obj + # Update and possibly print the progressbar. + # Note: does not call self.update(1) for speed optimisation. + n += 1 + # check counter first to avoid calls to time() + if n - last_print_n >= self.miniters: + miniters = self.miniters # watch monitoring thread changes + delta_t = time() - last_print_t + if delta_t >= mininterval: + cur_t = time() + delta_it = n - last_print_n + # EMA (not just overall average) + if smoothing and delta_t and delta_it: + rate = delta_t / delta_it + avg_time = self.ema(rate, avg_time, smoothing) + self.avg_time = avg_time + + self.n = n + self.display() + + # If no `miniters` was specified, adjust automatically + # to the max iteration rate seen so far between 2 prints + if dynamic_miniters: + if maxinterval and delta_t >= maxinterval: + # Adjust miniters to time interval by rule of 3 + if mininterval: + # Set miniters to correspond to mininterval + miniters = delta_it * mininterval / delta_t + else: + # Set miniters to correspond to maxinterval + miniters = delta_it * maxinterval / delta_t + elif smoothing: + # EMA-weight miniters to converge + # towards the timeframe of mininterval + rate = delta_it + if mininterval and delta_t: + rate *= mininterval / delta_t + miniters = self.ema(rate, miniters, smoothing) + else: + # Maximum nb of iterations between 2 prints + miniters = max(miniters, delta_it) + + # Store old values for next call + self.n = self.last_print_n = last_print_n = n + self.last_print_t = last_print_t = cur_t + self.miniters = miniters + + # Closing the progress bar. + # Update some internal variables for close(). + self.last_print_n = last_print_n + self.n = n + self.miniters = miniters + self.close() + + def update(self, n=1): + # if not self.gui: + # return super(tqdm_gui, self).close() + if self.disable: + return + + if n < 0: + self.last_print_n += n # for auto-refresh logic to work + self.n += n + + # check counter first to reduce calls to time() + if self.n - self.last_print_n >= self.miniters: + delta_t = self._time() - self.last_print_t + if delta_t >= self.mininterval: + cur_t = self._time() + delta_it = self.n - self.last_print_n # >= n + # elapsed = cur_t - self.start_t + # EMA (not just overall average) + if self.smoothing and delta_t and delta_it: + rate = delta_t / delta_it + self.avg_time = self.ema( + rate, self.avg_time, self.smoothing) + + self.display() + + # If no `miniters` was specified, adjust automatically to the + # maximum iteration rate seen so far between two prints. + # e.g.: After running `tqdm.update(5)`, subsequent + # calls to `tqdm.update()` will only cause an update after + # at least 5 more iterations. + if self.dynamic_miniters: + if self.maxinterval and delta_t >= self.maxinterval: + if self.mininterval: + self.miniters = delta_it * self.mininterval \ + / delta_t + else: + self.miniters = delta_it * self.maxinterval \ + / delta_t + elif self.smoothing: + self.miniters = self.smoothing * delta_it * \ + (self.mininterval / delta_t + if self.mininterval and delta_t + else 1) + \ + (1 - self.smoothing) * self.miniters + else: + self.miniters = max(self.miniters, delta_it) + + # Store old values for next call + self.last_print_n = self.n + self.last_print_t = cur_t + + def close(self): + # if not self.gui: + # return super(tqdm_gui, self).close() + if self.disable: + return + + self.disable = True + + with self.get_lock(): + self._instances.remove(self) + + # Restore toolbars + self.mpl.rcParams['toolbar'] = self.toolbar + # Return to non-interactive mode + if not self.wasion: + self.plt.ioff() + if not self.leave: + self.plt.close(self.fig) + + def display(self): + n = self.n + cur_t = self._time() + elapsed = cur_t - self.start_t + delta_it = n - self.last_print_n + delta_t = cur_t - self.last_print_t + + # Inline due to multiple calls + total = self.total + xdata = self.xdata + ydata = self.ydata + zdata = self.zdata + ax = self.ax + line1 = self.line1 + line2 = self.line2 + # instantaneous rate + y = delta_it / delta_t + # overall rate + z = n / elapsed + # update line data + xdata.append(n * 100.0 / total if total else cur_t) + ydata.append(y) + zdata.append(z) + + # Discard old values + # xmin, xmax = ax.get_xlim() + # if (not total) and elapsed > xmin * 1.1: + if (not total) and elapsed > 66: + xdata.popleft() + ydata.popleft() + zdata.popleft() + + ymin, ymax = ax.get_ylim() + if y > ymax or z > ymax: + ymax = 1.1 * y + ax.set_ylim(ymin, ymax) + ax.figure.canvas.draw() + + if total: + line1.set_data(xdata, ydata) + line2.set_data(xdata, zdata) + try: + poly_lims = self.hspan.get_xy() + except AttributeError: + self.hspan = self.plt.axhspan( + 0, 0.001, xmin=0, xmax=0, color='g') + poly_lims = self.hspan.get_xy() + poly_lims[0, 1] = ymin + poly_lims[1, 1] = ymax + poly_lims[2] = [n / total, ymax] + poly_lims[3] = [poly_lims[2, 0], ymin] + if len(poly_lims) > 4: + poly_lims[4, 1] = ymin + self.hspan.set_xy(poly_lims) + else: + t_ago = [cur_t - i for i in xdata] + line1.set_data(t_ago, ydata) + line2.set_data(t_ago, zdata) + + ax.set_title(self.format_meter( + n, total, elapsed, 0, + self.desc, self.ascii, self.unit, self.unit_scale, + 1 / self.avg_time if self.avg_time else None, self.bar_format, + self.postfix, self.unit_divisor), + fontname="DejaVu Sans Mono", fontsize=11) + self.plt.pause(1e-9) + + +def tgrange(*args, **kwargs): + """ + A shortcut for `tqdm.gui.tqdm(xrange(*args), **kwargs)`. + On Python3+, `range` is used instead of `xrange`. + """ + return tqdm_gui(_range(*args), **kwargs) + + +# Aliases +tqdm = tqdm_gui +trange = tgrange diff --git a/libs/tqdm/keras.py b/libs/tqdm/keras.py new file mode 100644 index 000000000..27623c099 --- /dev/null +++ b/libs/tqdm/keras.py @@ -0,0 +1,105 @@ +from __future__ import absolute_import, division +from .auto import tqdm as tqdm_auto +from copy import deepcopy +try: + import keras +except ImportError as e: + try: + from tensorflow import keras + except ImportError: + raise e +__author__ = {"github.com/": ["casperdcl"]} +__all__ = ['TqdmCallback'] + + +class TqdmCallback(keras.callbacks.Callback): + """`keras` callback for epoch and batch progress""" + @staticmethod + def bar2callback(bar, pop=None, delta=(lambda logs: 1)): + def callback(_, logs=None): + n = delta(logs) + if logs: + if pop: + logs = deepcopy(logs) + [logs.pop(i, 0) for i in pop] + bar.set_postfix(logs, refresh=False) + bar.update(n) + + return callback + + def __init__(self, epochs=None, data_size=None, batch_size=None, verbose=1, + tqdm_class=tqdm_auto): + """ + Parameters + ---------- + epochs : int, optional + data_size : int, optional + Number of training pairs. + batch_size : int, optional + Number of training pairs per batch. + verbose : int + 0: epoch, 1: batch (transient), 2: batch. [default: 1]. + Will be set to `0` unless both `data_size` and `batch_size` + are given. + tqdm_class : optional + `tqdm` class to use for bars [default: `tqdm.auto.tqdm`]. + """ + self.tqdm_class = tqdm_class + self.epoch_bar = tqdm_class(total=epochs, unit='epoch') + self.on_epoch_end = self.bar2callback(self.epoch_bar) + if data_size and batch_size: + self.batches = batches = (data_size + batch_size - 1) // batch_size + else: + self.batches = batches = None + self.verbose = verbose + if verbose == 1: + self.batch_bar = tqdm_class(total=batches, unit='batch', + leave=False) + self.on_batch_end = self.bar2callback( + self.batch_bar, + pop=['batch', 'size'], + delta=lambda logs: logs.get('size', 1)) + + def on_train_begin(self, *_, **__): + params = self.params.get + auto_total = params('epochs', params('nb_epoch', None)) + if auto_total is not None: + self.epoch_bar.reset(total=auto_total) + + def on_epoch_begin(self, *_, **__): + if self.verbose: + params = self.params.get + total = params('samples', params( + 'nb_sample', params('steps', None))) or self.batches + if self.verbose == 2: + if hasattr(self, 'batch_bar'): + self.batch_bar.close() + self.batch_bar = self.tqdm_class( + total=total, unit='batch', leave=True, + unit_scale=1 / (params('batch_size', 1) or 1)) + self.on_batch_end = self.bar2callback( + self.batch_bar, + pop=['batch', 'size'], + delta=lambda logs: logs.get('size', 1)) + elif self.verbose == 1: + self.batch_bar.unit_scale = 1 / (params('batch_size', 1) or 1) + self.batch_bar.reset(total=total) + else: + raise KeyError('Unknown verbosity') + + def on_train_end(self, *_, **__): + if self.verbose: + self.batch_bar.close() + self.epoch_bar.close() + + @staticmethod + def _implements_train_batch_hooks(): + return True + + @staticmethod + def _implements_test_batch_hooks(): + return True + + @staticmethod + def _implements_predict_batch_hooks(): + return True diff --git a/libs/tqdm/notebook.py b/libs/tqdm/notebook.py new file mode 100644 index 000000000..570510370 --- /dev/null +++ b/libs/tqdm/notebook.py @@ -0,0 +1,282 @@ +""" +IPython/Jupyter Notebook progressbar decorator for iterators. +Includes a default (x)range iterator printing to stderr. + +Usage: + >>> from tqdm.notebook import trange[, tqdm] + >>> for i in trange(10): #same as: for i in tqdm(xrange(10)) + ... ... +""" +# future division is important to divide integers and get as +# a result precise floating numbers (instead of truncated int) +from __future__ import division, absolute_import +# import compatibility functions and utilities +import sys +from .utils import _range +# to inherit from the tqdm class +from .std import tqdm as std_tqdm + + +if True: # pragma: no cover + # import IPython/Jupyter base widget and display utilities + IPY = 0 + IPYW = 0 + try: # IPython 4.x + import ipywidgets + IPY = 4 + try: + IPYW = int(ipywidgets.__version__.split('.')[0]) + except AttributeError: # __version__ may not exist in old versions + pass + except ImportError: # IPython 3.x / 2.x + IPY = 32 + import warnings + with warnings.catch_warnings(): + warnings.filterwarnings( + 'ignore', + message=".*The `IPython.html` package has been deprecated.*") + try: + import IPython.html.widgets as ipywidgets + except ImportError: + pass + + try: # IPython 4.x / 3.x + if IPY == 32: + from IPython.html.widgets import FloatProgress as IProgress + from IPython.html.widgets import HBox, HTML + IPY = 3 + else: + from ipywidgets import FloatProgress as IProgress + from ipywidgets import HBox, HTML + except ImportError: + try: # IPython 2.x + from IPython.html.widgets import FloatProgressWidget as IProgress + from IPython.html.widgets import ContainerWidget as HBox + from IPython.html.widgets import HTML + IPY = 2 + except ImportError: + IPY = 0 + + try: + from IPython.display import display # , clear_output + except ImportError: + pass + + # HTML encoding + try: # Py3 + from html import escape + except ImportError: # Py2 + from cgi import escape + + +__author__ = {"github.com/": ["lrq3000", "casperdcl", "alexanderkuk"]} +__all__ = ['tqdm_notebook', 'tnrange', 'tqdm', 'trange'] + + +class tqdm_notebook(std_tqdm): + """ + Experimental IPython/Jupyter Notebook widget using tqdm! + """ + + @staticmethod + def status_printer(_, total=None, desc=None, ncols=None): + """ + Manage the printing of an IPython/Jupyter Notebook progress bar widget. + """ + # Fallback to text bar if there's no total + # DEPRECATED: replaced with an 'info' style bar + # if not total: + # return super(tqdm_notebook, tqdm_notebook).status_printer(file) + + # fp = file + + # Prepare IPython progress bar + try: + if total: + pbar = IProgress(min=0, max=total) + else: # No total? Show info style bar with no progress tqdm status + pbar = IProgress(min=0, max=1) + pbar.value = 1 + pbar.bar_style = 'info' + except NameError: + # #187 #451 #558 + raise ImportError( + "FloatProgress not found. Please update jupyter and ipywidgets." + " See https://ipywidgets.readthedocs.io/en/stable" + "/user_install.html") + + if desc: + pbar.description = desc + if IPYW >= 7: + pbar.style.description_width = 'initial' + # Prepare status text + ptext = HTML() + # Only way to place text to the right of the bar is to use a container + container = HBox(children=[pbar, ptext]) + # Prepare layout + if ncols is not None: # use default style of ipywidgets + # ncols could be 100, "100px", "100%" + ncols = str(ncols) # ipywidgets only accepts string + try: + if int(ncols) > 0: # isnumeric and positive + ncols += 'px' + except ValueError: + pass + pbar.layout.flex = '2' + container.layout.width = ncols + container.layout.display = 'inline-flex' + container.layout.flex_flow = 'row wrap' + display(container) + + return container + + def display(self, msg=None, pos=None, + # additional signals + close=False, bar_style=None): + # Note: contrary to native tqdm, msg='' does NOT clear bar + # goal is to keep all infos if error happens so user knows + # at which iteration the loop failed. + + # Clear previous output (really necessary?) + # clear_output(wait=1) + + if not msg and not close: + msg = self.__repr__() + + pbar, ptext = self.container.children + pbar.value = self.n + + if msg: + # html escape special characters (like '&') + if '<bar/>' in msg: + left, right = map(escape, msg.split('<bar/>', 1)) + else: + left, right = '', escape(msg) + + # remove inesthetical pipes + if left and left[-1] == '|': + left = left[:-1] + if right and right[0] == '|': + right = right[1:] + + # Update description + pbar.description = left + if IPYW >= 7: + pbar.style.description_width = 'initial' + + # never clear the bar (signal: msg='') + if right: + ptext.value = right + + # Change bar style + if bar_style: + # Hack-ish way to avoid the danger bar_style being overridden by + # success because the bar gets closed after the error... + if not (pbar.bar_style == 'danger' and bar_style == 'success'): + pbar.bar_style = bar_style + + # Special signal to close the bar + if close and pbar.bar_style != 'danger': # hide only if no error + try: + self.container.close() + except AttributeError: + self.container.visible = False + + def __init__(self, *args, **kwargs): + # Setup default output + file_kwarg = kwargs.get('file', sys.stderr) + if file_kwarg is sys.stderr or file_kwarg is None: + kwargs['file'] = sys.stdout # avoid the red block in IPython + + # Initialize parent class + avoid printing by using gui=True + kwargs['gui'] = True + kwargs.setdefault('bar_format', '{l_bar}{bar}{r_bar}') + kwargs['bar_format'] = kwargs['bar_format'].replace('{bar}', '<bar/>') + # convert disable = None to False + kwargs['disable'] = bool(kwargs.get('disable', False)) + super(tqdm_notebook, self).__init__(*args, **kwargs) + if self.disable or not kwargs['gui']: + self.sp = lambda *_, **__: None + return + + # Get bar width + self.ncols = '100%' if self.dynamic_ncols else kwargs.get("ncols", None) + + # Replace with IPython progress bar display (with correct total) + unit_scale = 1 if self.unit_scale is True else self.unit_scale or 1 + total = self.total * unit_scale if self.total else self.total + self.container = self.status_printer( + self.fp, total, self.desc, self.ncols) + self.sp = self.display + + # Print initial bar state + if not self.disable: + self.display() + + def __iter__(self, *args, **kwargs): + try: + for obj in super(tqdm_notebook, self).__iter__(*args, **kwargs): + # return super(tqdm...) will not catch exception + yield obj + # NB: except ... [ as ...] breaks IPython async KeyboardInterrupt + except: # NOQA + self.sp(bar_style='danger') + raise + # NB: don't `finally: close()` + # since this could be a shared bar which the user will `reset()` + + def update(self, *args, **kwargs): + try: + super(tqdm_notebook, self).update(*args, **kwargs) + # NB: except ... [ as ...] breaks IPython async KeyboardInterrupt + except: # NOQA + # cannot catch KeyboardInterrupt when using manual tqdm + # as the interrupt will most likely happen on another statement + self.sp(bar_style='danger') + raise + # NB: don't `finally: close()` + # since this could be a shared bar which the user will `reset()` + + def close(self, *args, **kwargs): + super(tqdm_notebook, self).close(*args, **kwargs) + # Try to detect if there was an error or KeyboardInterrupt + # in manual mode: if n < total, things probably got wrong + if self.total and self.n < self.total: + self.sp(bar_style='danger') + else: + if self.leave: + self.sp(bar_style='success') + else: + self.sp(close=True) + + def moveto(self, *args, **kwargs): + # void -> avoid extraneous `\n` in IPython output cell + return + + def reset(self, total=None): + """ + Resets to 0 iterations for repeated use. + + Consider combining with `leave=True`. + + Parameters + ---------- + total : int or float, optional. Total to use for the new bar. + """ + if total is not None: + pbar, _ = self.container.children + pbar.max = total + return super(tqdm_notebook, self).reset(total=total) + + +def tnrange(*args, **kwargs): + """ + A shortcut for `tqdm.notebook.tqdm(xrange(*args), **kwargs)`. + On Python3+, `range` is used instead of `xrange`. + """ + return tqdm_notebook(_range(*args), **kwargs) + + +# Aliases +tqdm = tqdm_notebook +trange = tnrange diff --git a/libs/tqdm/std.py b/libs/tqdm/std.py new file mode 100644 index 000000000..0cdc8e6b7 --- /dev/null +++ b/libs/tqdm/std.py @@ -0,0 +1,1503 @@ +""" +Customisable progressbar decorator for iterators. +Includes a default (x)range iterator printing to stderr. + +Usage: + >>> from tqdm import trange[, tqdm] + >>> for i in trange(10): #same as: for i in tqdm(xrange(10)) + ... ... +""" +from __future__ import absolute_import, division +# compatibility functions and utilities +from .utils import _supports_unicode, _screen_shape_wrapper, _range, _unich, \ + _term_move_up, _unicode, WeakSet, _basestring, _OrderedDict, \ + Comparable, _is_ascii, FormatReplace, disp_len, disp_trim, \ + SimpleTextIOWrapper, CallbackIOWrapper +from ._monitor import TMonitor +# native libraries +from contextlib import contextmanager +import sys +from numbers import Number +from time import time +# For parallelism safety +import threading as th +from warnings import warn + +__author__ = {"github.com/": ["noamraph", "obiwanus", "kmike", "hadim", + "casperdcl", "lrq3000"]} +__all__ = ['tqdm', 'trange', + 'TqdmTypeError', 'TqdmKeyError', 'TqdmWarning', + 'TqdmExperimentalWarning', 'TqdmDeprecationWarning', + 'TqdmMonitorWarning'] + + +class TqdmTypeError(TypeError): + pass + + +class TqdmKeyError(KeyError): + pass + + +class TqdmWarning(Warning): + """base class for all tqdm warnings. + + Used for non-external-code-breaking errors, such as garbled printing. + """ + def __init__(self, msg, fp_write=None, *a, **k): + if fp_write is not None: + fp_write("\n" + self.__class__.__name__ + ": " + + str(msg).rstrip() + '\n') + else: + super(TqdmWarning, self).__init__(msg, *a, **k) + + +class TqdmExperimentalWarning(TqdmWarning, FutureWarning): + """beta feature, unstable API and behaviour""" + pass + + +class TqdmDeprecationWarning(TqdmWarning, DeprecationWarning): + # not suppressed if raised + pass + + +class TqdmMonitorWarning(TqdmWarning, RuntimeWarning): + """tqdm monitor errors which do not affect external functionality""" + pass + + +class TqdmDefaultWriteLock(object): + """ + Provide a default write lock for thread and multiprocessing safety. + Works only on platforms supporting `fork` (so Windows is excluded). + You must initialise a `tqdm` or `TqdmDefaultWriteLock` instance + before forking in order for the write lock to work. + On Windows, you need to supply the lock from the parent to the children as + an argument to joblib or the parallelism lib you use. + """ + def __init__(self): + # Create global parallelism locks to avoid racing issues with parallel + # bars works only if fork available (Linux/MacOSX, but not Windows) + self.create_mp_lock() + self.create_th_lock() + cls = type(self) + self.locks = [lk for lk in [cls.mp_lock, cls.th_lock] if lk is not None] + + def acquire(self, *a, **k): + for lock in self.locks: + lock.acquire(*a, **k) + + def release(self): + for lock in self.locks[::-1]: # Release in inverse order of acquisition + lock.release() + + def __enter__(self): + self.acquire() + + def __exit__(self, *exc): + self.release() + + @classmethod + def create_mp_lock(cls): + if not hasattr(cls, 'mp_lock'): + try: + from multiprocessing import RLock + cls.mp_lock = RLock() # multiprocessing lock + except ImportError: # pragma: no cover + cls.mp_lock = None + except OSError: # pragma: no cover + cls.mp_lock = None + + @classmethod + def create_th_lock(cls): + if not hasattr(cls, 'th_lock'): + try: + cls.th_lock = th.RLock() # thread lock + except OSError: # pragma: no cover + cls.th_lock = None + + +# Create a thread lock before instantiation so that no setup needs to be done +# before running in a multithreaded environment. +# Do not create the multiprocessing lock because it sets the multiprocessing +# context and does not allow the user to use 'spawn' or 'forkserver' methods. +TqdmDefaultWriteLock.create_th_lock() + + +class Bar(object): + """ + `str.format`-able bar with format specifiers: `[width][type]` + + - `width` + + unspecified (default): use `self.default_len` + + `int >= 0`: overrides `self.default_len` + + `int < 0`: subtract from `self.default_len` + - `type` + + `a`: ascii (`charset=self.ASCII` override) + + `u`: unicode (`charset=self.UTF` override) + + `b`: blank (`charset=" "` override) + """ + ASCII = " 123456789#" + UTF = u" " + u''.join(map(_unich, range(0x258F, 0x2587, -1))) + BLANK = " " + + def __init__(self, frac, default_len=10, charset=UTF): + if not (0 <= frac <= 1): + warn("clamping frac to range [0, 1]", TqdmWarning, stacklevel=2) + frac = max(0, min(1, frac)) + assert default_len > 0 + self.frac = frac + self.default_len = default_len + self.charset = charset + + def __format__(self, format_spec): + if format_spec: + _type = format_spec[-1].lower() + try: + charset = dict(a=self.ASCII, u=self.UTF, b=self.BLANK)[_type] + except KeyError: + charset = self.charset + else: + format_spec = format_spec[:-1] + if format_spec: + N_BARS = int(format_spec) + if N_BARS < 0: + N_BARS += self.default_len + else: + N_BARS = self.default_len + else: + charset = self.charset + N_BARS = self.default_len + + nsyms = len(charset) - 1 + bar_length, frac_bar_length = divmod( + int(self.frac * N_BARS * nsyms), nsyms) + + bar = charset[-1] * bar_length + frac_bar = charset[frac_bar_length] + + # whitespace padding + if bar_length < N_BARS: + return bar + frac_bar + \ + charset[0] * (N_BARS - bar_length - 1) + return bar + + +class tqdm(Comparable): + """ + Decorate an iterable object, returning an iterator which acts exactly + like the original iterable, but prints a dynamically updating + progressbar every time a value is requested. + """ + + monitor_interval = 10 # set to 0 to disable the thread + monitor = None + + @staticmethod + def format_sizeof(num, suffix='', divisor=1000): + """ + Formats a number (greater than unity) with SI Order of Magnitude + prefixes. + + Parameters + ---------- + num : float + Number ( >= 1) to format. + suffix : str, optional + Post-postfix [default: '']. + divisor : float, optional + Divisor between prefixes [default: 1000]. + + Returns + ------- + out : str + Number with Order of Magnitude SI unit postfix. + """ + for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']: + if abs(num) < 999.5: + if abs(num) < 99.95: + if abs(num) < 9.995: + return '{0:1.2f}'.format(num) + unit + suffix + return '{0:2.1f}'.format(num) + unit + suffix + return '{0:3.0f}'.format(num) + unit + suffix + num /= divisor + return '{0:3.1f}Y'.format(num) + suffix + + @staticmethod + def format_interval(t): + """ + Formats a number of seconds as a clock time, [H:]MM:SS + + Parameters + ---------- + t : int + Number of seconds. + + Returns + ------- + out : str + [H:]MM:SS + """ + mins, s = divmod(int(t), 60) + h, m = divmod(mins, 60) + if h: + return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s) + else: + return '{0:02d}:{1:02d}'.format(m, s) + + @staticmethod + def format_num(n): + """ + Intelligent scientific notation (.3g). + + Parameters + ---------- + n : int or float or Numeric + A Number. + + Returns + ------- + out : str + Formatted number. + """ + f = '{0:.3g}'.format(n).replace('+0', '+').replace('-0', '-') + n = str(n) + return f if len(f) < len(n) else n + + @staticmethod + def ema(x, mu=None, alpha=0.3): + """ + Exponential moving average: smoothing to give progressively lower + weights to older values. + + Parameters + ---------- + x : float + New value to include in EMA. + mu : float, optional + Previous EMA value. + alpha : float, optional + Smoothing factor in range [0, 1], [default: 0.3]. + Increase to give more weight to recent values. + Ranges from 0 (yields mu) to 1 (yields x). + """ + return x if mu is None else (alpha * x) + (1 - alpha) * mu + + @staticmethod + def status_printer(file): + """ + Manage the printing and in-place updating of a line of characters. + Note that if the string is longer than a line, then in-place + updating may not work (it will print a new line at each refresh). + """ + fp = file + fp_flush = getattr(fp, 'flush', lambda: None) # pragma: no cover + + def fp_write(s): + fp.write(_unicode(s)) + fp_flush() + + last_len = [0] + + def print_status(s): + len_s = len(s) + fp_write('\r' + s + (' ' * max(last_len[0] - len_s, 0))) + last_len[0] = len_s + + return print_status + + @staticmethod + def format_meter(n, total, elapsed, ncols=None, prefix='', ascii=False, + unit='it', unit_scale=False, rate=None, bar_format=None, + postfix=None, unit_divisor=1000, **extra_kwargs): + """ + Return a string-based progress bar given some parameters + + Parameters + ---------- + n : int or float + Number of finished iterations. + total : int or float + The expected total number of iterations. If meaningless (None), + only basic progress statistics are displayed (no ETA). + elapsed : float + Number of seconds passed since start. + ncols : int, optional + The width of the entire output message. If specified, + dynamically resizes `{bar}` to stay within this bound + [default: None]. If `0`, will not print any bar (only stats). + The fallback is `{bar:10}`. + prefix : str, optional + Prefix message (included in total width) [default: '']. + Use as {desc} in bar_format string. + ascii : bool, optional or str, optional + If not set, use unicode (smooth blocks) to fill the meter + [default: False]. The fallback is to use ASCII characters + " 123456789#". + unit : str, optional + The iteration unit [default: 'it']. + unit_scale : bool or int or float, optional + If 1 or True, the number of iterations will be printed with an + appropriate SI metric prefix (k = 10^3, M = 10^6, etc.) + [default: False]. If any other non-zero number, will scale + `total` and `n`. + rate : float, optional + Manual override for iteration rate. + If [default: None], uses n/elapsed. + bar_format : str, optional + Specify a custom bar string formatting. May impact performance. + [default: '{l_bar}{bar}{r_bar}'], where + l_bar='{desc}: {percentage:3.0f}%|' and + r_bar='| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ' + '{rate_fmt}{postfix}]' + Possible vars: l_bar, bar, r_bar, n, n_fmt, total, total_fmt, + percentage, elapsed, elapsed_s, ncols, nrows, desc, unit, + rate, rate_fmt, rate_noinv, rate_noinv_fmt, + rate_inv, rate_inv_fmt, postfix, unit_divisor, + remaining, remaining_s. + Note that a trailing ": " is automatically removed after {desc} + if the latter is empty. + postfix : *, optional + Similar to `prefix`, but placed at the end + (e.g. for additional stats). + Note: postfix is usually a string (not a dict) for this method, + and will if possible be set to postfix = ', ' + postfix. + However other types are supported (#382). + unit_divisor : float, optional + [default: 1000], ignored unless `unit_scale` is True. + + Returns + ------- + out : Formatted meter and stats, ready to display. + """ + + # sanity check: total + if total and n >= (total + 0.5): # allow float imprecision (#849) + total = None + + # apply custom scale if necessary + if unit_scale and unit_scale not in (True, 1): + if total: + total *= unit_scale + n *= unit_scale + if rate: + rate *= unit_scale # by default rate = 1 / self.avg_time + unit_scale = False + + elapsed_str = tqdm.format_interval(elapsed) + + # if unspecified, attempt to use rate = average speed + # (we allow manual override since predicting time is an arcane art) + if rate is None and elapsed: + rate = n / elapsed + inv_rate = 1 / rate if rate else None + format_sizeof = tqdm.format_sizeof + rate_noinv_fmt = ((format_sizeof(rate) if unit_scale else + '{0:5.2f}'.format(rate)) + if rate else '?') + unit + '/s' + rate_inv_fmt = ((format_sizeof(inv_rate) if unit_scale else + '{0:5.2f}'.format(inv_rate)) + if inv_rate else '?') + 's/' + unit + rate_fmt = rate_inv_fmt if inv_rate and inv_rate > 1 else rate_noinv_fmt + + if unit_scale: + n_fmt = format_sizeof(n, divisor=unit_divisor) + total_fmt = format_sizeof(total, divisor=unit_divisor) \ + if total is not None else '?' + else: + n_fmt = str(n) + total_fmt = str(total) if total is not None else '?' + + try: + postfix = ', ' + postfix if postfix else '' + except TypeError: + pass + + remaining = (total - n) / rate if rate and total else 0 + remaining_str = tqdm.format_interval(remaining) if rate else '?' + + # format the stats displayed to the left and right sides of the bar + if prefix: + # old prefix setup work around + bool_prefix_colon_already = (prefix[-2:] == ": ") + l_bar = prefix if bool_prefix_colon_already else prefix + ": " + else: + l_bar = '' + + r_bar = '| {0}/{1} [{2}<{3}, {4}{5}]'.format( + n_fmt, total_fmt, elapsed_str, remaining_str, rate_fmt, postfix) + + # Custom bar formatting + # Populate a dict with all available progress indicators + format_dict = dict( + # slight extension of self.format_dict + n=n, n_fmt=n_fmt, total=total, total_fmt=total_fmt, + elapsed=elapsed_str, elapsed_s=elapsed, + ncols=ncols, desc=prefix or '', unit=unit, + rate=inv_rate if inv_rate and inv_rate > 1 else rate, + rate_fmt=rate_fmt, rate_noinv=rate, + rate_noinv_fmt=rate_noinv_fmt, rate_inv=inv_rate, + rate_inv_fmt=rate_inv_fmt, + postfix=postfix, unit_divisor=unit_divisor, + # plus more useful definitions + remaining=remaining_str, remaining_s=remaining, + l_bar=l_bar, r_bar=r_bar, + **extra_kwargs) + + # total is known: we can predict some stats + if total: + # fractional and percentage progress + frac = n / total + percentage = frac * 100 + + l_bar += '{0:3.0f}%|'.format(percentage) + + if ncols == 0: + return l_bar[:-1] + r_bar[1:] + + format_dict.update(l_bar=l_bar) + if bar_format: + format_dict.update(percentage=percentage) + + # auto-remove colon for empty `desc` + if not prefix: + bar_format = bar_format.replace("{desc}: ", '') + else: + bar_format = "{l_bar}{bar}{r_bar}" + + full_bar = FormatReplace() + try: + nobar = bar_format.format(bar=full_bar, **format_dict) + except UnicodeEncodeError: + bar_format = _unicode(bar_format) + nobar = bar_format.format(bar=full_bar, **format_dict) + if not full_bar.format_called: + # no {bar}, we can just format and return + return nobar + + # Formatting progress bar space available for bar's display + full_bar = Bar( + frac, + max(1, ncols - disp_len(nobar)) + if ncols else 10, + charset=Bar.ASCII if ascii is True else ascii or Bar.UTF) + if not _is_ascii(full_bar.charset) and _is_ascii(bar_format): + bar_format = _unicode(bar_format) + res = bar_format.format(bar=full_bar, **format_dict) + return disp_trim(res, ncols) if ncols else res + + elif bar_format: + # user-specified bar_format but no total + l_bar += '|' + format_dict.update(l_bar=l_bar, percentage=0) + full_bar = FormatReplace() + nobar = bar_format.format(bar=full_bar, **format_dict) + if not full_bar.format_called: + return nobar + full_bar = Bar( + 0, + max(1, ncols - disp_len(nobar)) + if ncols else 10, + charset=Bar.BLANK) + res = bar_format.format(bar=full_bar, **format_dict) + return disp_trim(res, ncols) if ncols else res + else: + # no total: no progressbar, ETA, just progress stats + return ((prefix + ": ") if prefix else '') + \ + '{0}{1} [{2}, {3}{4}]'.format( + n_fmt, unit, elapsed_str, rate_fmt, postfix) + + def __new__(cls, *args, **kwargs): + # Create a new instance + instance = object.__new__(cls) + # Construct the lock if it does not exist + with cls.get_lock(): + # Add to the list of instances + if not hasattr(cls, '_instances'): + cls._instances = WeakSet() + cls._instances.add(instance) + # Create the monitoring thread + if cls.monitor_interval and (cls.monitor is None or not + cls.monitor.report()): + try: + cls.monitor = TMonitor(cls, cls.monitor_interval) + except Exception as e: # pragma: nocover + warn("tqdm:disabling monitor support" + " (monitor_interval = 0) due to:\n" + str(e), + TqdmMonitorWarning, stacklevel=2) + cls.monitor_interval = 0 + # Return the instance + return instance + + @classmethod + def _get_free_pos(cls, instance=None): + """Skips specified instance.""" + positions = set(abs(inst.pos) for inst in cls._instances + if inst is not instance and hasattr(inst, "pos")) + return min(set(range(len(positions) + 1)).difference(positions)) + + @classmethod + def _decr_instances(cls, instance): + """ + Remove from list and reposition another unfixed bar + to fill the new gap. + + This means that by default (where all nested bars are unfixed), + order is not maintained but screen flicker/blank space is minimised. + (tqdm<=4.44.1 moved ALL subsequent unfixed bars up.) + """ + with cls._lock: + try: + cls._instances.remove(instance) + except KeyError: + # if not instance.gui: # pragma: no cover + # raise + pass # py2: maybe magically removed already + # else: + if not instance.gui: + last = (instance.nrows or 20) - 1 + # find unfixed (`pos >= 0`) overflow (`pos >= nrows - 1`) + instances = list(filter( + lambda i: hasattr(i, "pos") and last <= i.pos, + cls._instances)) + # set first found to current `pos` + if instances: + inst = min(instances, key=lambda i: i.pos) + inst.clear(nolock=True) + inst.pos = abs(instance.pos) + # Kill monitor if no instances are left + if not cls._instances and cls.monitor: + try: + cls.monitor.exit() + del cls.monitor + except AttributeError: # pragma: nocover + pass + else: + cls.monitor = None + + @classmethod + def write(cls, s, file=None, end="\n", nolock=False): + """Print a message via tqdm (without overlap with bars).""" + fp = file if file is not None else sys.stdout + with cls.external_write_mode(file=file, nolock=nolock): + # Write the message + fp.write(s) + fp.write(end) + + @classmethod + @contextmanager + def external_write_mode(cls, file=None, nolock=False): + """ + Disable tqdm within context and refresh tqdm when exits. + Useful when writing to standard output stream + """ + fp = file if file is not None else sys.stdout + + try: + if not nolock: + cls.get_lock().acquire() + # Clear all bars + inst_cleared = [] + for inst in getattr(cls, '_instances', []): + # Clear instance if in the target output file + # or if write output + tqdm output are both either + # sys.stdout or sys.stderr (because both are mixed in terminal) + if hasattr(inst, "start_t") and (inst.fp == fp or all( + f in (sys.stdout, sys.stderr) for f in (fp, inst.fp))): + inst.clear(nolock=True) + inst_cleared.append(inst) + yield + # Force refresh display of bars we cleared + for inst in inst_cleared: + inst.refresh(nolock=True) + finally: + if not nolock: + cls._lock.release() + + @classmethod + def set_lock(cls, lock): + """Set the global lock.""" + cls._lock = lock + + @classmethod + def get_lock(cls): + """Get the global lock. Construct it if it does not exist.""" + if not hasattr(cls, '_lock'): + cls._lock = TqdmDefaultWriteLock() + return cls._lock + + @classmethod + def pandas(tclass, *targs, **tkwargs): + """ + Registers the given `tqdm` class with + pandas.core. + ( frame.DataFrame + | series.Series + | groupby.(generic.)DataFrameGroupBy + | groupby.(generic.)SeriesGroupBy + ).progress_apply + + A new instance will be create every time `progress_apply` is called, + and each instance will automatically close() upon completion. + + Parameters + ---------- + targs, tkwargs : arguments for the tqdm instance + + Examples + -------- + >>> import pandas as pd + >>> import numpy as np + >>> from tqdm import tqdm + >>> from tqdm.gui import tqdm as tqdm_gui + >>> + >>> df = pd.DataFrame(np.random.randint(0, 100, (100000, 6))) + >>> tqdm.pandas(ncols=50) # can use tqdm_gui, optional kwargs, etc + >>> # Now you can use `progress_apply` instead of `apply` + >>> df.groupby(0).progress_apply(lambda x: x**2) + + References + ---------- + https://stackoverflow.com/questions/18603270/ + progress-indicator-during-pandas-operations-python + """ + from pandas.core.frame import DataFrame + from pandas.core.series import Series + try: + from pandas import Panel + except ImportError: # TODO: pandas>0.25.2 + Panel = None + try: # pandas>=1.0.0 + from pandas.core.window.rolling import _Rolling_and_Expanding + except ImportError: + try: # pandas>=0.18.0 + from pandas.core.window import _Rolling_and_Expanding + except ImportError: # pragma: no cover + _Rolling_and_Expanding = None + try: # pandas>=0.25.0 + from pandas.core.groupby.generic import DataFrameGroupBy, \ + SeriesGroupBy # , NDFrameGroupBy + except ImportError: + try: # pandas>=0.23.0 + from pandas.core.groupby.groupby import DataFrameGroupBy, \ + SeriesGroupBy + except ImportError: + from pandas.core.groupby import DataFrameGroupBy, \ + SeriesGroupBy + try: # pandas>=0.23.0 + from pandas.core.groupby.groupby import GroupBy + except ImportError: + from pandas.core.groupby import GroupBy + + try: # pandas>=0.23.0 + from pandas.core.groupby.groupby import PanelGroupBy + except ImportError: + try: + from pandas.core.groupby import PanelGroupBy + except ImportError: # pandas>=0.25.0 + PanelGroupBy = None + + deprecated_t = [tkwargs.pop('deprecated_t', None)] + + def inner_generator(df_function='apply'): + def inner(df, func, *args, **kwargs): + """ + Parameters + ---------- + df : (DataFrame|Series)[GroupBy] + Data (may be grouped). + func : function + To be applied on the (grouped) data. + **kwargs : optional + Transmitted to `df.apply()`. + """ + + # Precompute total iterations + total = tkwargs.pop("total", getattr(df, 'ngroups', None)) + if total is None: # not grouped + if df_function == 'applymap': + total = df.size + elif isinstance(df, Series): + total = len(df) + elif _Rolling_and_Expanding is None or \ + not isinstance(df, _Rolling_and_Expanding): + # DataFrame or Panel + axis = kwargs.get('axis', 0) + if axis == 'index': + axis = 0 + elif axis == 'columns': + axis = 1 + # when axis=0, total is shape[axis1] + total = df.size // df.shape[axis] + + # Init bar + if deprecated_t[0] is not None: + t = deprecated_t[0] + deprecated_t[0] = None + else: + t = tclass(*targs, total=total, **tkwargs) + + if len(args) > 0: + # *args intentionally not supported (see #244, #299) + TqdmDeprecationWarning( + "Except func, normal arguments are intentionally" + + " not supported by" + + " `(DataFrame|Series|GroupBy).progress_apply`." + + " Use keyword arguments instead.", + fp_write=getattr(t.fp, 'write', sys.stderr.write)) + + try: + func = df._is_builtin_func(func) + except TypeError: + pass + + # Define bar updating wrapper + def wrapper(*args, **kwargs): + # update tbar correctly + # it seems `pandas apply` calls `func` twice + # on the first column/row to decide whether it can + # take a fast or slow code path; so stop when t.total==t.n + t.update(n=1 if not t.total or t.n < t.total else 0) + return func(*args, **kwargs) + + # Apply the provided function (in **kwargs) + # on the df using our wrapper (which provides bar updating) + try: + return getattr(df, df_function)(wrapper, **kwargs) + finally: + t.close() + + return inner + + # Monkeypatch pandas to provide easy methods + # Enable custom tqdm progress in pandas! + Series.progress_apply = inner_generator() + SeriesGroupBy.progress_apply = inner_generator() + Series.progress_map = inner_generator('map') + SeriesGroupBy.progress_map = inner_generator('map') + + DataFrame.progress_apply = inner_generator() + DataFrameGroupBy.progress_apply = inner_generator() + DataFrame.progress_applymap = inner_generator('applymap') + + if Panel is not None: + Panel.progress_apply = inner_generator() + if PanelGroupBy is not None: + PanelGroupBy.progress_apply = inner_generator() + + GroupBy.progress_apply = inner_generator() + GroupBy.progress_aggregate = inner_generator('aggregate') + GroupBy.progress_transform = inner_generator('transform') + + if _Rolling_and_Expanding is not None: # pragma: no cover + _Rolling_and_Expanding.progress_apply = inner_generator() + + def __init__(self, iterable=None, desc=None, total=None, leave=True, + file=None, ncols=None, mininterval=0.1, maxinterval=10.0, + miniters=None, ascii=None, disable=False, unit='it', + unit_scale=False, dynamic_ncols=False, smoothing=0.3, + bar_format=None, initial=0, position=None, postfix=None, + unit_divisor=1000, write_bytes=None, lock_args=None, + nrows=None, + gui=False, **kwargs): + """ + Parameters + ---------- + iterable : iterable, optional + Iterable to decorate with a progressbar. + Leave blank to manually manage the updates. + desc : str, optional + Prefix for the progressbar. + total : int or float, optional + The number of expected iterations. If unspecified, + len(iterable) is used if possible. If float("inf") or as a last + resort, only basic progress statistics are displayed + (no ETA, no progressbar). + If `gui` is True and this parameter needs subsequent updating, + specify an initial arbitrary large positive number, + e.g. 9e9. + leave : bool, optional + If [default: True], keeps all traces of the progressbar + upon termination of iteration. + If `None`, will leave only if `position` is `0`. + file : `io.TextIOWrapper` or `io.StringIO`, optional + Specifies where to output the progress messages + (default: sys.stderr). Uses `file.write(str)` and `file.flush()` + methods. For encoding, see `write_bytes`. + ncols : int, optional + The width of the entire output message. If specified, + dynamically resizes the progressbar to stay within this bound. + If unspecified, attempts to use environment width. The + fallback is a meter width of 10 and no limit for the counter and + statistics. If 0, will not print any meter (only stats). + mininterval : float, optional + Minimum progress display update interval [default: 0.1] seconds. + maxinterval : float, optional + Maximum progress display update interval [default: 10] seconds. + Automatically adjusts `miniters` to correspond to `mininterval` + after long display update lag. Only works if `dynamic_miniters` + or monitor thread is enabled. + miniters : int or float, optional + Minimum progress display update interval, in iterations. + If 0 and `dynamic_miniters`, will automatically adjust to equal + `mininterval` (more CPU efficient, good for tight loops). + If > 0, will skip display of specified number of iterations. + Tweak this and `mininterval` to get very efficient loops. + If your progress is erratic with both fast and slow iterations + (network, skipping items, etc) you should set miniters=1. + ascii : bool or str, optional + If unspecified or False, use unicode (smooth blocks) to fill + the meter. The fallback is to use ASCII characters " 123456789#". + disable : bool, optional + Whether to disable the entire progressbar wrapper + [default: False]. If set to None, disable on non-TTY. + unit : str, optional + String that will be used to define the unit of each iteration + [default: it]. + unit_scale : bool or int or float, optional + If 1 or True, the number of iterations will be reduced/scaled + automatically and a metric prefix following the + International System of Units standard will be added + (kilo, mega, etc.) [default: False]. If any other non-zero + number, will scale `total` and `n`. + dynamic_ncols : bool, optional + If set, constantly alters `ncols` and `nrows` to the + environment (allowing for window resizes) [default: False]. + smoothing : float, optional + Exponential moving average smoothing factor for speed estimates + (ignored in GUI mode). Ranges from 0 (average speed) to 1 + (current/instantaneous speed) [default: 0.3]. + bar_format : str, optional + Specify a custom bar string formatting. May impact performance. + [default: '{l_bar}{bar}{r_bar}'], where + l_bar='{desc}: {percentage:3.0f}%|' and + r_bar='| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ' + '{rate_fmt}{postfix}]' + Possible vars: l_bar, bar, r_bar, n, n_fmt, total, total_fmt, + percentage, elapsed, elapsed_s, ncols, nrows, desc, unit, + rate, rate_fmt, rate_noinv, rate_noinv_fmt, + rate_inv, rate_inv_fmt, postfix, unit_divisor, + remaining, remaining_s. + Note that a trailing ": " is automatically removed after {desc} + if the latter is empty. + initial : int or float, optional + The initial counter value. Useful when restarting a progress + bar [default: 0]. If using float, consider specifying `{n:.3f}` + or similar in `bar_format`, or specifying `unit_scale`. + position : int, optional + Specify the line offset to print this bar (starting from 0) + Automatic if unspecified. + Useful to manage multiple bars at once (eg, from threads). + postfix : dict or *, optional + Specify additional stats to display at the end of the bar. + Calls `set_postfix(**postfix)` if possible (dict). + unit_divisor : float, optional + [default: 1000], ignored unless `unit_scale` is True. + write_bytes : bool, optional + If (default: None) and `file` is unspecified, + bytes will be written in Python 2. If `True` will also write + bytes. In all other cases will default to unicode. + lock_args : tuple, optional + Passed to `refresh` for intermediate output + (initialisation, iterating, and updating). + nrows : int, optional + The screen height. If specified, hides nested bars outside this + bound. If unspecified, attempts to use environment height. + The fallback is 20. + gui : bool, optional + WARNING: internal parameter - do not use. + Use tqdm.gui.tqdm(...) instead. If set, will attempt to use + matplotlib animations for a graphical output [default: False]. + + Returns + ------- + out : decorated iterator. + """ + if write_bytes is None: + write_bytes = file is None and sys.version_info < (3,) + + if file is None: + file = sys.stderr + + if write_bytes: + # Despite coercing unicode into bytes, py2 sys.std* streams + # should have bytes written to them. + file = SimpleTextIOWrapper( + file, encoding=getattr(file, 'encoding', None) or 'utf-8') + + if disable is None and hasattr(file, "isatty") and not file.isatty(): + disable = True + + if total is None and iterable is not None: + try: + total = len(iterable) + except (TypeError, AttributeError): + total = None + if total == float("inf"): + # Infinite iterations, behave same as unknown + total = None + + if disable: + self.iterable = iterable + self.disable = disable + with self._lock: + self.pos = self._get_free_pos(self) + self._instances.remove(self) + self.n = initial + self.total = total + self.leave = leave + return + + if kwargs: + self.disable = True + with self._lock: + self.pos = self._get_free_pos(self) + self._instances.remove(self) + raise ( + TqdmDeprecationWarning( + "`nested` is deprecated and automated.\n" + "Use `position` instead for manual control.\n", + fp_write=getattr(file, 'write', sys.stderr.write)) + if "nested" in kwargs else + TqdmKeyError("Unknown argument(s): " + str(kwargs))) + + # Preprocess the arguments + if ((ncols is None or nrows is None) and + (file in (sys.stderr, sys.stdout))) or \ + dynamic_ncols: # pragma: no cover + if dynamic_ncols: + dynamic_ncols = _screen_shape_wrapper() + if dynamic_ncols: + ncols, nrows = dynamic_ncols(file) + else: + _dynamic_ncols = _screen_shape_wrapper() + if _dynamic_ncols: + _ncols, _nrows = _dynamic_ncols(file) + if ncols is None: + ncols = _ncols + if nrows is None: + nrows = _nrows + + if miniters is None: + miniters = 0 + dynamic_miniters = True + else: + dynamic_miniters = False + + if mininterval is None: + mininterval = 0 + + if maxinterval is None: + maxinterval = 0 + + if ascii is None: + ascii = not _supports_unicode(file) + + if bar_format and not ((ascii is True) or _is_ascii(ascii)): + # Convert bar format into unicode since terminal uses unicode + bar_format = _unicode(bar_format) + + if smoothing is None: + smoothing = 0 + + # Store the arguments + self.iterable = iterable + self.desc = desc or '' + self.total = total + self.leave = leave + self.fp = file + self.ncols = ncols + self.nrows = nrows + self.mininterval = mininterval + self.maxinterval = maxinterval + self.miniters = miniters + self.dynamic_miniters = dynamic_miniters + self.ascii = ascii + self.disable = disable + self.unit = unit + self.unit_scale = unit_scale + self.unit_divisor = unit_divisor + self.lock_args = lock_args + self.gui = gui + self.dynamic_ncols = dynamic_ncols + self.smoothing = smoothing + self.avg_time = None + self._time = time + self.bar_format = bar_format + self.postfix = None + if postfix: + try: + self.set_postfix(refresh=False, **postfix) + except TypeError: + self.postfix = postfix + + # Init the iterations counters + self.last_print_n = initial + self.n = initial + + # if nested, at initial sp() call we replace '\r' by '\n' to + # not overwrite the outer progress bar + with self._lock: + if position is None: + self.pos = self._get_free_pos(self) + else: # mark fixed positions as negative + self.pos = -position + + if not gui: + # Initialize the screen printer + self.sp = self.status_printer(self.fp) + self.refresh(lock_args=self.lock_args) + + # Init the time counter + self.last_print_t = self._time() + # NB: Avoid race conditions by setting start_t at the very end of init + self.start_t = self.last_print_t + + def __bool__(self): + if self.total is not None: + return self.total > 0 + if self.iterable is None: + raise TypeError('bool() undefined when iterable == total == None') + return bool(self.iterable) + + def __nonzero__(self): + return self.__bool__() + + def __len__(self): + return self.total if self.iterable is None else \ + (self.iterable.shape[0] if hasattr(self.iterable, "shape") + else len(self.iterable) if hasattr(self.iterable, "__len__") + else getattr(self, "total", None)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + try: + self.close() + except AttributeError: + # maybe eager thread cleanup upon external error + if (exc_type, exc_value, traceback) == (None, None, None): + raise + warn("AttributeError ignored", TqdmWarning, stacklevel=2) + + def __del__(self): + self.close() + + def __repr__(self): + return self.format_meter(**self.format_dict) + + @property + def _comparable(self): + return abs(getattr(self, "pos", 1 << 31)) + + def __hash__(self): + return id(self) + + def __iter__(self): + """Backward-compatibility to use: for x in tqdm(iterable)""" + + # Inlining instance variables as locals (speed optimisation) + iterable = self.iterable + + # If the bar is disabled, then just walk the iterable + # (note: keep this check outside the loop for performance) + if self.disable: + for obj in iterable: + yield obj + return + + mininterval = self.mininterval + maxinterval = self.maxinterval + miniters = self.miniters + dynamic_miniters = self.dynamic_miniters + last_print_t = self.last_print_t + last_print_n = self.last_print_n + n = self.n + smoothing = self.smoothing + avg_time = self.avg_time + time = self._time + + if not hasattr(self, 'sp'): + raise TqdmDeprecationWarning( + "Please use `tqdm.gui.tqdm(...)` instead of" + " `tqdm(..., gui=True)`\n", + fp_write=getattr(self.fp, 'write', sys.stderr.write)) + + try: + for obj in iterable: + yield obj + # Update and possibly print the progressbar. + # Note: does not call self.update(1) for speed optimisation. + n += 1 + # check counter first to avoid calls to time() + if n - last_print_n >= self.miniters: + miniters = self.miniters # watch monitoring thread changes + delta_t = time() - last_print_t + if delta_t >= mininterval: + cur_t = time() + delta_it = n - last_print_n + # EMA (not just overall average) + if smoothing and delta_t and delta_it: + rate = delta_t / delta_it + avg_time = self.ema(rate, avg_time, smoothing) + self.avg_time = avg_time + + self.n = n + self.refresh(lock_args=self.lock_args) + + # If no `miniters` was specified, adjust automatically + # to the max iteration rate seen so far between 2 prints + if dynamic_miniters: + if maxinterval and delta_t >= maxinterval: + # Adjust miniters to time interval by rule of 3 + if mininterval: + # Set miniters to correspond to mininterval + miniters = delta_it * mininterval / delta_t + else: + # Set miniters to correspond to maxinterval + miniters = delta_it * maxinterval / delta_t + elif smoothing: + # EMA-weight miniters to converge + # towards the timeframe of mininterval + rate = delta_it + if mininterval and delta_t: + rate *= mininterval / delta_t + miniters = self.ema(rate, miniters, smoothing) + else: + # Maximum nb of iterations between 2 prints + miniters = max(miniters, delta_it) + + # Store old values for next call + self.n = self.last_print_n = last_print_n = n + self.last_print_t = last_print_t = cur_t + self.miniters = miniters + finally: + # Closing the progress bar. + # Update some internal variables for close(). + self.last_print_n = last_print_n + self.n = n + self.miniters = miniters + self.close() + + def update(self, n=1): + """ + Manually update the progress bar, useful for streams + such as reading files. + E.g.: + >>> t = tqdm(total=filesize) # Initialise + >>> for current_buffer in stream: + ... ... + ... t.update(len(current_buffer)) + >>> t.close() + The last line is highly recommended, but possibly not necessary if + `t.update()` will be called in such a way that `filesize` will be + exactly reached and printed. + + Parameters + ---------- + n : int or float, optional + Increment to add to the internal counter of iterations + [default: 1]. If using float, consider specifying `{n:.3f}` + or similar in `bar_format`, or specifying `unit_scale`. + """ + # N.B.: see __iter__() for more comments. + if self.disable: + return + + if n < 0: + self.last_print_n += n # for auto-refresh logic to work + self.n += n + + # check counter first to reduce calls to time() + if self.n - self.last_print_n >= self.miniters: + delta_t = self._time() - self.last_print_t + if delta_t >= self.mininterval: + cur_t = self._time() + delta_it = self.n - self.last_print_n # >= n + # elapsed = cur_t - self.start_t + # EMA (not just overall average) + if self.smoothing and delta_t and delta_it: + rate = delta_t / delta_it + self.avg_time = self.ema( + rate, self.avg_time, self.smoothing) + + if not hasattr(self, "sp"): + raise TqdmDeprecationWarning( + "Please use `tqdm.gui.tqdm(...)`" + " instead of `tqdm(..., gui=True)`\n", + fp_write=getattr(self.fp, 'write', sys.stderr.write)) + + self.refresh(lock_args=self.lock_args) + + # If no `miniters` was specified, adjust automatically to the + # maximum iteration rate seen so far between two prints. + # e.g.: After running `tqdm.update(5)`, subsequent + # calls to `tqdm.update()` will only cause an update after + # at least 5 more iterations. + if self.dynamic_miniters: + if self.maxinterval and delta_t >= self.maxinterval: + if self.mininterval: + self.miniters = delta_it * self.mininterval \ + / delta_t + else: + self.miniters = delta_it * self.maxinterval \ + / delta_t + elif self.smoothing: + self.miniters = self.smoothing * delta_it * \ + (self.mininterval / delta_t + if self.mininterval and delta_t + else 1) + \ + (1 - self.smoothing) * self.miniters + else: + self.miniters = max(self.miniters, delta_it) + + # Store old values for next call + self.last_print_n = self.n + self.last_print_t = cur_t + + def close(self): + """Cleanup and (if leave=False) close the progressbar.""" + if self.disable: + return + + # Prevent multiple closures + self.disable = True + + # decrement instance pos and remove from internal set + pos = abs(self.pos) + self._decr_instances(self) + + # GUI mode + if not hasattr(self, "sp"): + return + + # annoyingly, _supports_unicode isn't good enough + def fp_write(s): + self.fp.write(_unicode(s)) + + try: + fp_write('') + except ValueError as e: + if 'closed' in str(e): + return + raise # pragma: no cover + + leave = pos == 0 if self.leave is None else self.leave + + with self._lock: + if leave: + # stats for overall rate (no weighted average) + self.avg_time = None + self.display(pos=0) + fp_write('\n') + else: + # clear previous display + if self.display(msg='', pos=pos) and not pos: + fp_write('\r') + + def clear(self, nolock=False): + """Clear current bar display.""" + if self.disable: + return + + if not nolock: + self._lock.acquire() + pos = abs(self.pos) + if pos < (self.nrows or 20): + self.moveto(pos) + self.sp('') + self.fp.write('\r') # place cursor back at the beginning of line + self.moveto(-pos) + if not nolock: + self._lock.release() + + def refresh(self, nolock=False, lock_args=None): + """ + Force refresh the display of this bar. + + Parameters + ---------- + nolock : bool, optional + If `True`, does not lock. + If [default: `False`]: calls `acquire()` on internal lock. + lock_args : tuple, optional + Passed to internal lock's `acquire()`. + If specified, will only `display()` if `acquire()` returns `True`. + """ + if self.disable: + return + + if not nolock: + if lock_args: + if not self._lock.acquire(*lock_args): + return False + else: + self._lock.acquire() + self.display() + if not nolock: + self._lock.release() + return True + + def unpause(self): + """Restart tqdm timer from last print time.""" + cur_t = self._time() + self.start_t += cur_t - self.last_print_t + self.last_print_t = cur_t + + def reset(self, total=None): + """ + Resets to 0 iterations for repeated use. + + Consider combining with `leave=True`. + + Parameters + ---------- + total : int or float, optional. Total to use for the new bar. + """ + self.last_print_n = self.n = 0 + self.last_print_t = self.start_t = self._time() + if total is not None: + self.total = total + self.refresh() + + def set_description(self, desc=None, refresh=True): + """ + Set/modify description of the progress bar. + + Parameters + ---------- + desc : str, optional + refresh : bool, optional + Forces refresh [default: True]. + """ + self.desc = desc + ': ' if desc else '' + if refresh: + self.refresh() + + def set_description_str(self, desc=None, refresh=True): + """Set/modify description without ': ' appended.""" + self.desc = desc or '' + if refresh: + self.refresh() + + def set_postfix(self, ordered_dict=None, refresh=True, **kwargs): + """ + Set/modify postfix (additional stats) + with automatic formatting based on datatype. + + Parameters + ---------- + ordered_dict : dict or OrderedDict, optional + refresh : bool, optional + Forces refresh [default: True]. + kwargs : dict, optional + """ + # Sort in alphabetical order to be more deterministic + postfix = _OrderedDict([] if ordered_dict is None else ordered_dict) + for key in sorted(kwargs.keys()): + postfix[key] = kwargs[key] + # Preprocess stats according to datatype + for key in postfix.keys(): + # Number: limit the length of the string + if isinstance(postfix[key], Number): + postfix[key] = self.format_num(postfix[key]) + # Else for any other type, try to get the string conversion + elif not isinstance(postfix[key], _basestring): + postfix[key] = str(postfix[key]) + # Else if it's a string, don't need to preprocess anything + # Stitch together to get the final postfix + self.postfix = ', '.join(key + '=' + postfix[key].strip() + for key in postfix.keys()) + if refresh: + self.refresh() + + def set_postfix_str(self, s='', refresh=True): + """ + Postfix without dictionary expansion, similar to prefix handling. + """ + self.postfix = str(s) + if refresh: + self.refresh() + + def moveto(self, n): + # TODO: private method + self.fp.write(_unicode('\n' * n + _term_move_up() * -n)) + self.fp.flush() + + @property + def format_dict(self): + """Public API for read-only member access.""" + if self.dynamic_ncols: + self.ncols, self.nrows = self.dynamic_ncols(self.fp) + ncols, nrows = self.ncols, self.nrows + return dict( + n=self.n, total=self.total, + elapsed=self._time() - self.start_t + if hasattr(self, 'start_t') else 0, + ncols=ncols, nrows=nrows, + prefix=self.desc, ascii=self.ascii, unit=self.unit, + unit_scale=self.unit_scale, + rate=1 / self.avg_time if self.avg_time else None, + bar_format=self.bar_format, postfix=self.postfix, + unit_divisor=self.unit_divisor) + + def display(self, msg=None, pos=None): + """ + Use `self.sp` to display `msg` in the specified `pos`. + + Consider overloading this function when inheriting to use e.g.: + `self.some_frontend(**self.format_dict)` instead of `self.sp`. + + Parameters + ---------- + msg : str, optional. What to display (default: `repr(self)`). + pos : int, optional. Position to `moveto` + (default: `abs(self.pos)`). + """ + if pos is None: + pos = abs(self.pos) + + nrows = self.nrows or 20 + if pos >= nrows - 1: + if pos >= nrows: + return False + if msg or msg is None: # override at `nrows - 1` + msg = " ... (more hidden) ..." + + if pos: + self.moveto(pos) + self.sp(self.__repr__() if msg is None else msg) + if pos: + self.moveto(-pos) + return True + + @classmethod + @contextmanager + def wrapattr(tclass, stream, method, total=None, bytes=True, **tkwargs): + """ + stream : file-like object. + method : str, "read" or "write". The result of `read()` and + the first argument of `write()` should have a `len()`. + + >>> with tqdm.wrapattr(file_obj, "read", total=file_obj.size) as fobj: + ... while True: + ... chunk = fobj.read(chunk_size) + ... if not chunk: + ... break + """ + with tclass(total=total, **tkwargs) as t: + if bytes: + t.unit = "B" + t.unit_scale = True + t.unit_divisor = 1024 + yield CallbackIOWrapper(t.update, stream, method) + + +def trange(*args, **kwargs): + """ + A shortcut for tqdm(xrange(*args), **kwargs). + On Python3+ range is used instead of xrange. + """ + return tqdm(_range(*args), **kwargs) diff --git a/libs/tqdm/tests/tests_concurrent.py b/libs/tqdm/tests/tests_concurrent.py new file mode 100644 index 000000000..e64cb789b --- /dev/null +++ b/libs/tqdm/tests/tests_concurrent.py @@ -0,0 +1,58 @@ +""" +Tests for `tqdm.contrib.concurrent`. +""" +from warnings import catch_warnings +from tqdm.contrib.concurrent import thread_map, process_map +from tests_tqdm import with_setup, pretest, posttest, SkipTest, StringIO, \ + closing + + +def incr(x): + """Dummy function""" + return x + 1 + + +@with_setup(pretest, posttest) +def test_thread_map(): + """Test contrib.concurrent.thread_map""" + with closing(StringIO()) as our_file: + a = range(9) + b = [i + 1 for i in a] + try: + assert thread_map(lambda x: x + 1, a, file=our_file) == b + except ImportError: + raise SkipTest + assert thread_map(incr, a, file=our_file) == b + + +@with_setup(pretest, posttest) +def test_process_map(): + """Test contrib.concurrent.process_map""" + with closing(StringIO()) as our_file: + a = range(9) + b = [i + 1 for i in a] + try: + assert process_map(incr, a, file=our_file) == b + except ImportError: + raise SkipTest + + +def test_chunksize_warning(): + """Test contrib.concurrent.process_map chunksize warnings""" + try: + from unittest.mock import patch + except ImportError: + raise SkipTest + + for iterables, should_warn in [ + ([], False), + (['x'], False), + ([()], False), + (['x', ()], False), + (['x' * 1001], True), + (['x' * 100, ('x',) * 1001], True), + ]: + with patch('tqdm.contrib.concurrent._executor_map'): + with catch_warnings(record=True) as w: + process_map(incr, *iterables) + assert should_warn == bool(w) diff --git a/libs/tqdm/tests/tests_contrib.py b/libs/tqdm/tests/tests_contrib.py new file mode 100644 index 000000000..e79fad22e --- /dev/null +++ b/libs/tqdm/tests/tests_contrib.py @@ -0,0 +1,61 @@ +""" +Tests for `tqdm.contrib`. +""" +import sys +from tqdm.contrib import tenumerate, tzip, tmap +from tests_tqdm import with_setup, pretest, posttest, SkipTest, StringIO, \ + closing + + +def incr(x): + """Dummy function""" + return x + 1 + + +@with_setup(pretest, posttest) +def test_enumerate(): + """Test contrib.tenumerate""" + with closing(StringIO()) as our_file: + a = range(9) + assert list(tenumerate(a, file=our_file)) == list(enumerate(a)) + assert list(tenumerate(a, 42, file=our_file)) == list(enumerate(a, 42)) + + +@with_setup(pretest, posttest) +def test_enumerate_numpy(): + """Test contrib.tenumerate(numpy.ndarray)""" + try: + import numpy as np + except ImportError: + raise SkipTest + with closing(StringIO()) as our_file: + a = np.random.random((42, 1337)) + assert list(tenumerate(a, file=our_file)) == list(np.ndenumerate(a)) + + +@with_setup(pretest, posttest) +def test_zip(): + """Test contrib.tzip""" + with closing(StringIO()) as our_file: + a = range(9) + b = [i + 1 for i in a] + if sys.version_info[:1] < (3,): + assert tzip(a, b, file=our_file) == zip(a, b) + else: + gen = tzip(a, b, file=our_file) + assert gen != list(zip(a, b)) + assert list(gen) == list(zip(a, b)) + + +@with_setup(pretest, posttest) +def test_map(): + """Test contrib.tmap""" + with closing(StringIO()) as our_file: + a = range(9) + b = [i + 1 for i in a] + if sys.version_info[:1] < (3,): + assert tmap(lambda x: x + 1, a, file=our_file) == map(incr, a) + else: + gen = tmap(lambda x: x + 1, a, file=our_file) + assert gen != b + assert list(gen) == b diff --git a/libs/tqdm/tests/tests_itertools.py b/libs/tqdm/tests/tests_itertools.py new file mode 100644 index 000000000..c55e07db8 --- /dev/null +++ b/libs/tqdm/tests/tests_itertools.py @@ -0,0 +1,27 @@ +""" +Tests for `tqdm.contrib.itertools`. +""" +from tqdm.contrib.itertools import product +from tests_tqdm import with_setup, pretest, posttest, StringIO, closing +import itertools + + +class NoLenIter(object): + def __init__(self, iterable): + self._it = iterable + + def __iter__(self): + for i in self._it: + yield i + + +@with_setup(pretest, posttest) +def test_product(): + """Test contrib.itertools.product""" + with closing(StringIO()) as our_file: + a = range(9) + assert list(product(a, a[::-1], file=our_file)) == \ + list(itertools.product(a, a[::-1])) + + assert list(product(a, NoLenIter(a), file=our_file)) == \ + list(itertools.product(a, NoLenIter(a))) diff --git a/libs/tqdm/tests/tests_keras.py b/libs/tqdm/tests/tests_keras.py new file mode 100644 index 000000000..11684c490 --- /dev/null +++ b/libs/tqdm/tests/tests_keras.py @@ -0,0 +1,97 @@ +from __future__ import division +from tqdm import tqdm +from tests_tqdm import with_setup, pretest, posttest, SkipTest, StringIO, \ + closing + + +@with_setup(pretest, posttest) +def test_keras(): + """Test tqdm.keras.TqdmCallback""" + try: + from tqdm.keras import TqdmCallback + import numpy as np + try: + import keras as K + except ImportError: + from tensorflow import keras as K + except ImportError: + raise SkipTest + + # 1D autoencoder + dtype = np.float32 + model = K.models.Sequential( + [K.layers.InputLayer((1, 1), dtype=dtype), K.layers.Conv1D(1, 1)] + ) + model.compile("adam", "mse") + x = np.random.rand(100, 1, 1).astype(dtype) + batch_size = 10 + batches = len(x) / batch_size + epochs = 5 + + with closing(StringIO()) as our_file: + + class Tqdm(tqdm): + """redirected I/O class""" + + def __init__(self, *a, **k): + k.setdefault("file", our_file) + super(Tqdm, self).__init__(*a, **k) + + # just epoch (no batch) progress + model.fit( + x, + x, + epochs=epochs, + batch_size=batch_size, + verbose=False, + callbacks=[ + TqdmCallback( + epochs, + data_size=len(x), + batch_size=batch_size, + verbose=0, + tqdm_class=Tqdm, + ) + ], + ) + res = our_file.getvalue() + assert "{epochs}/{epochs}".format(epochs=epochs) in res + assert "{batches}/{batches}".format(batches=batches) not in res + + # full (epoch and batch) progress + our_file.seek(0) + our_file.truncate() + model.fit( + x, + x, + epochs=epochs, + batch_size=batch_size, + verbose=False, + callbacks=[ + TqdmCallback( + epochs, + data_size=len(x), + batch_size=batch_size, + verbose=2, + tqdm_class=Tqdm, + ) + ], + ) + res = our_file.getvalue() + assert "{epochs}/{epochs}".format(epochs=epochs) in res + assert "{batches}/{batches}".format(batches=batches) in res + + # auto-detect epochs and batches + our_file.seek(0) + our_file.truncate() + model.fit( + x, + x, + epochs=epochs, + batch_size=batch_size, + verbose=False, + callbacks=[TqdmCallback(verbose=2, tqdm_class=Tqdm)], + ) + res = our_file.getvalue() + assert "{epochs}/{epochs}".format(epochs=epochs) in res + assert "{batches}/{batches}".format(batches=batches) in res diff --git a/libs/tqdm/tests/tests_main.py b/libs/tqdm/tests/tests_main.py new file mode 100644 index 000000000..75727071e --- /dev/null +++ b/libs/tqdm/tests/tests_main.py @@ -0,0 +1,172 @@ +import sys +import subprocess +from os import path +from shutil import rmtree +from tempfile import mkdtemp +from tqdm.cli import main, TqdmKeyError, TqdmTypeError +from tqdm.utils import IS_WIN +from io import open as io_open + +from tests_tqdm import with_setup, pretest, posttest, _range, closing, \ + UnicodeIO, StringIO, SkipTest + + +def _sh(*cmd, **kwargs): + return subprocess.Popen(cmd, stdout=subprocess.PIPE, + **kwargs).communicate()[0].decode('utf-8') + + +class Null(object): + def __call__(self, *_, **__): + return self + + def __getattr__(self, _): + return self + + +IN_DATA_LIST = map(str, _range(int(123))) +NULL = Null() + + +# WARNING: this should be the last test as it messes with sys.stdin, argv +@with_setup(pretest, posttest) +def test_main(): + """Test command line pipes""" + ls_out = _sh('ls').replace('\r\n', '\n') + ls = subprocess.Popen('ls', stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + res = _sh(sys.executable, '-c', 'from tqdm.cli import main; main()', + stdin=ls.stdout, stderr=subprocess.STDOUT) + ls.wait() + + # actual test: + + assert ls_out in res.replace('\r\n', '\n') + + # semi-fake test which gets coverage: + _SYS = sys.stdin, sys.argv + + with closing(StringIO()) as sys.stdin: + sys.argv = ['', '--desc', 'Test CLI --delim', + '--ascii', 'True', '--delim', r'\0', '--buf_size', '64'] + sys.stdin.write('\0'.join(map(str, _range(int(123))))) + # sys.stdin.write(b'\xff') # TODO + sys.stdin.seek(0) + main() + sys.stdin = IN_DATA_LIST + + sys.argv = ['', '--desc', 'Test CLI pipes', + '--ascii', 'True', '--unit_scale', 'True'] + import tqdm.__main__ # NOQA + + with closing(StringIO()) as sys.stdin: + IN_DATA = '\0'.join(IN_DATA_LIST) + sys.stdin.write(IN_DATA) + sys.stdin.seek(0) + sys.argv = ['', '--ascii', '--bytes=True', '--unit_scale', 'False'] + with closing(UnicodeIO()) as fp: + main(fp=fp) + assert str(len(IN_DATA)) in fp.getvalue() + sys.stdin = IN_DATA_LIST + + # test --log + with closing(StringIO()) as sys.stdin: + sys.stdin.write('\0'.join(map(str, _range(int(123))))) + sys.stdin.seek(0) + # with closing(UnicodeIO()) as fp: + main(argv=['--log', 'DEBUG'], fp=NULL) + # assert "DEBUG:" in sys.stdout.getvalue() + sys.stdin = IN_DATA_LIST + + # clean up + sys.stdin, sys.argv = _SYS + + +def test_manpath(): + """Test CLI --manpath""" + if IS_WIN: + raise SkipTest + tmp = mkdtemp() + man = path.join(tmp, "tqdm.1") + assert not path.exists(man) + try: + main(argv=['--manpath', tmp], fp=NULL) + except SystemExit: + pass + else: + raise SystemExit("Expected system exit") + assert path.exists(man) + rmtree(tmp, True) + + +def test_comppath(): + """Test CLI --comppath""" + if IS_WIN: + raise SkipTest + tmp = mkdtemp() + man = path.join(tmp, "tqdm_completion.sh") + assert not path.exists(man) + try: + main(argv=['--comppath', tmp], fp=NULL) + except SystemExit: + pass + else: + raise SystemExit("Expected system exit") + assert path.exists(man) + + # check most important options appear + with io_open(man, mode='r', encoding='utf-8') as fd: + script = fd.read() + opts = set([ + '--help', '--desc', '--total', '--leave', '--ncols', '--ascii', + '--dynamic_ncols', '--position', '--bytes', '--nrows', '--delim', + '--manpath', '--comppath' + ]) + assert all(args in script for args in opts) + rmtree(tmp, True) + + +def test_exceptions(): + """Test CLI Exceptions""" + _SYS = sys.stdin, sys.argv + sys.stdin = IN_DATA_LIST + + sys.argv = ['', '-ascii', '-unit_scale', '--bad_arg_u_ment', 'foo'] + try: + main(fp=NULL) + except TqdmKeyError as e: + if 'bad_arg_u_ment' not in str(e): + raise + else: + raise TqdmKeyError('bad_arg_u_ment') + + sys.argv = ['', '-ascii', '-unit_scale', 'invalid_bool_value'] + try: + main(fp=NULL) + except TqdmTypeError as e: + if 'invalid_bool_value' not in str(e): + raise + else: + raise TqdmTypeError('invalid_bool_value') + + sys.argv = ['', '-ascii', '--total', 'invalid_int_value'] + try: + main(fp=NULL) + except TqdmTypeError as e: + if 'invalid_int_value' not in str(e): + raise + else: + raise TqdmTypeError('invalid_int_value') + + # test SystemExits + for i in ('-h', '--help', '-v', '--version'): + sys.argv = ['', i] + try: + main(fp=NULL) + except SystemExit: + pass + else: + raise ValueError('expected SystemExit') + + # clean up + sys.stdin, sys.argv = _SYS diff --git a/libs/tqdm/tests/tests_notebook.py b/libs/tqdm/tests/tests_notebook.py new file mode 100644 index 000000000..3af992f0c --- /dev/null +++ b/libs/tqdm/tests/tests_notebook.py @@ -0,0 +1,9 @@ +from tqdm.notebook import tqdm as tqdm_notebook +from tests_tqdm import with_setup, pretest, posttest + + +@with_setup(pretest, posttest) +def test_notebook_disabled_description(): + """Test that set_description works for disabled tqdm_notebook""" + with tqdm_notebook(1, disable=True) as t: + t.set_description("description") diff --git a/libs/tqdm/tests/tests_pandas.py b/libs/tqdm/tests/tests_pandas.py new file mode 100644 index 000000000..8719a7ca2 --- /dev/null +++ b/libs/tqdm/tests/tests_pandas.py @@ -0,0 +1,264 @@ +from tqdm import tqdm +from tests_tqdm import with_setup, pretest, posttest, SkipTest, \ + StringIO, closing + + +@with_setup(pretest, posttest) +def test_pandas_setup(): + """Test tqdm.pandas()""" + try: + from numpy.random import randint + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + tqdm.pandas(file=our_file, leave=True, ascii=True, total=123) + series = pd.Series(randint(0, 50, (100,))) + series.progress_apply(lambda x: x + 10) + res = our_file.getvalue() + assert '100/123' in res + + +@with_setup(pretest, posttest) +def test_pandas_rolling_expanding(): + """Test pandas.(Series|DataFrame).(rolling|expanding)""" + try: + from numpy.random import randint + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + tqdm.pandas(file=our_file, leave=True, ascii=True) + + series = pd.Series(randint(0, 50, (123,))) + res1 = series.rolling(10).progress_apply(lambda x: 1, raw=True) + res2 = series.rolling(10).apply(lambda x: 1, raw=True) + assert res1.equals(res2) + + res3 = series.expanding(10).progress_apply(lambda x: 2, raw=True) + res4 = series.expanding(10).apply(lambda x: 2, raw=True) + assert res3.equals(res4) + + expects = ['114it'] # 123-10+1 + for exres in expects: + our_file.seek(0) + if our_file.getvalue().count(exres) < 2: + our_file.seek(0) + raise AssertionError( + "\nExpected:\n{0}\nIn:\n{1}\n".format( + exres + " at least twice.", our_file.read())) + + +@with_setup(pretest, posttest) +def test_pandas_series(): + """Test pandas.Series.progress_apply and .progress_map""" + try: + from numpy.random import randint + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + tqdm.pandas(file=our_file, leave=True, ascii=True) + + series = pd.Series(randint(0, 50, (123,))) + res1 = series.progress_apply(lambda x: x + 10) + res2 = series.apply(lambda x: x + 10) + assert res1.equals(res2) + + res3 = series.progress_map(lambda x: x + 10) + res4 = series.map(lambda x: x + 10) + assert res3.equals(res4) + + expects = ['100%', '123/123'] + for exres in expects: + our_file.seek(0) + if our_file.getvalue().count(exres) < 2: + our_file.seek(0) + raise AssertionError( + "\nExpected:\n{0}\nIn:\n{1}\n".format( + exres + " at least twice.", our_file.read())) + + +@with_setup(pretest, posttest) +def test_pandas_data_frame(): + """Test pandas.DataFrame.progress_apply and .progress_applymap""" + try: + from numpy.random import randint + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + tqdm.pandas(file=our_file, leave=True, ascii=True) + df = pd.DataFrame(randint(0, 50, (100, 200))) + + def task_func(x): + return x + 1 + + # applymap + res1 = df.progress_applymap(task_func) + res2 = df.applymap(task_func) + assert res1.equals(res2) + + # apply unhashable + res1 = [] + df.progress_apply(res1.extend) + assert len(res1) == df.size + + # apply + for axis in [0, 1, 'index', 'columns']: + res3 = df.progress_apply(task_func, axis=axis) + res4 = df.apply(task_func, axis=axis) + assert res3.equals(res4) + + our_file.seek(0) + if our_file.read().count('100%') < 3: + our_file.seek(0) + raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format( + '100% at least three times', our_file.read())) + + # apply_map, apply axis=0, apply axis=1 + expects = ['20000/20000', '200/200', '100/100'] + for exres in expects: + our_file.seek(0) + if our_file.getvalue().count(exres) < 1: + our_file.seek(0) + raise AssertionError( + "\nExpected:\n{0}\nIn:\n {1}\n".format( + exres + " at least once.", our_file.read())) + + +@with_setup(pretest, posttest) +def test_pandas_groupby_apply(): + """Test pandas.DataFrame.groupby(...).progress_apply""" + try: + from numpy.random import randint, rand + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + tqdm.pandas(file=our_file, leave=False, ascii=True) + + df = pd.DataFrame(randint(0, 50, (500, 3))) + df.groupby(0).progress_apply(lambda x: None) + + dfs = pd.DataFrame(randint(0, 50, (500, 3)), columns=list('abc')) + dfs.groupby(['a']).progress_apply(lambda x: None) + + df2 = df = pd.DataFrame(dict(a=randint(1, 8, 10000), b=rand(10000))) + res1 = df2.groupby("a").apply(max) + res2 = df2.groupby("a").progress_apply(max) + assert res1.equals(res2) + + our_file.seek(0) + + # don't expect final output since no `leave` and + # high dynamic `miniters` + nexres = '100%|##########|' + if nexres in our_file.read(): + our_file.seek(0) + raise AssertionError("\nDid not expect:\n{0}\nIn:{1}\n".format( + nexres, our_file.read())) + + with closing(StringIO()) as our_file: + tqdm.pandas(file=our_file, leave=True, ascii=True) + + dfs = pd.DataFrame(randint(0, 50, (500, 3)), columns=list('abc')) + dfs.loc[0] = [2, 1, 1] + dfs['d'] = 100 + + expects = ['500/500', '1/1', '4/4', '2/2'] + dfs.groupby(dfs.index).progress_apply(lambda x: None) + dfs.groupby('d').progress_apply(lambda x: None) + dfs.groupby(dfs.columns, axis=1).progress_apply(lambda x: None) + dfs.groupby([2, 2, 1, 1], axis=1).progress_apply(lambda x: None) + + our_file.seek(0) + if our_file.read().count('100%') < 4: + our_file.seek(0) + raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format( + '100% at least four times', our_file.read())) + + for exres in expects: + our_file.seek(0) + if our_file.getvalue().count(exres) < 1: + our_file.seek(0) + raise AssertionError( + "\nExpected:\n{0}\nIn:\n {1}\n".format( + exres + " at least once.", our_file.read())) + + +@with_setup(pretest, posttest) +def test_pandas_leave(): + """Test pandas with `leave=True`""" + try: + from numpy.random import randint + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + df = pd.DataFrame(randint(0, 100, (1000, 6))) + tqdm.pandas(file=our_file, leave=True, ascii=True) + df.groupby(0).progress_apply(lambda x: None) + + our_file.seek(0) + + exres = '100%|##########| 100/100' + if exres not in our_file.read(): + our_file.seek(0) + raise AssertionError( + "\nExpected:\n{0}\nIn:{1}\n".format(exres, our_file.read())) + + +@with_setup(pretest, posttest) +def test_pandas_apply_args_deprecation(): + """Test warning info in + `pandas.Dataframe(Series).progress_apply(func, *args)`""" + try: + from numpy.random import randint + from tqdm import tqdm_pandas + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + tqdm_pandas(tqdm(file=our_file, leave=False, ascii=True, ncols=20)) + df = pd.DataFrame(randint(0, 50, (500, 3))) + df.progress_apply(lambda x: None, 1) # 1 shall cause a warning + # Check deprecation message + res = our_file.getvalue() + assert all([i in res for i in ( + "TqdmDeprecationWarning", "not supported", + "keyword arguments instead")]) + + +@with_setup(pretest, posttest) +def test_pandas_deprecation(): + """Test bar object instance as argument deprecation""" + try: + from numpy.random import randint + from tqdm import tqdm_pandas + import pandas as pd + except ImportError: + raise SkipTest + + with closing(StringIO()) as our_file: + tqdm_pandas(tqdm(file=our_file, leave=False, ascii=True, ncols=20)) + df = pd.DataFrame(randint(0, 50, (500, 3))) + df.groupby(0).progress_apply(lambda x: None) + # Check deprecation message + assert "TqdmDeprecationWarning" in our_file.getvalue() + assert "instead of `tqdm_pandas(tqdm(...))`" in our_file.getvalue() + + with closing(StringIO()) as our_file: + tqdm_pandas(tqdm, file=our_file, leave=False, ascii=True, ncols=20) + df = pd.DataFrame(randint(0, 50, (500, 3))) + df.groupby(0).progress_apply(lambda x: None) + # Check deprecation message + assert "TqdmDeprecationWarning" in our_file.getvalue() + assert "instead of `tqdm_pandas(tqdm, ...)`" in our_file.getvalue() diff --git a/libs/tqdm/tests/tests_perf.py b/libs/tqdm/tests/tests_perf.py new file mode 100644 index 000000000..6cb7a6ee5 --- /dev/null +++ b/libs/tqdm/tests/tests_perf.py @@ -0,0 +1,367 @@ +from __future__ import print_function, division + +from nose.plugins.skip import SkipTest + +from contextlib import contextmanager + +import sys +from time import sleep, time + +from tqdm import trange +from tqdm import tqdm + +from tests_tqdm import with_setup, pretest, posttest, StringIO, closing, _range + +# Use relative/cpu timer to have reliable timings when there is a sudden load +try: + from time import process_time +except ImportError: + from time import clock + process_time = clock + + +def get_relative_time(prevtime=0): + return process_time() - prevtime + + +def cpu_sleep(t): + """Sleep the given amount of cpu time""" + start = process_time() + while (process_time() - start) < t: + pass + + +def checkCpuTime(sleeptime=0.2): + """Check if cpu time works correctly""" + if checkCpuTime.passed: + return True + # First test that sleeping does not consume cputime + start1 = process_time() + sleep(sleeptime) + t1 = process_time() - start1 + + # secondly check by comparing to cpusleep (where we actually do something) + start2 = process_time() + cpu_sleep(sleeptime) + t2 = process_time() - start2 + + if abs(t1) < 0.0001 and (t1 < t2 / 10): + return True + raise SkipTest + + +checkCpuTime.passed = False + + +@contextmanager +def relative_timer(): + start = process_time() + + def elapser(): + return process_time() - start + + yield lambda: elapser() + spent = process_time() - start + + def elapser(): # NOQA + return spent + + +def retry_on_except(n=3): + def wrapper(fn): + def test_inner(): + for i in range(1, n + 1): + try: + checkCpuTime() + fn() + except SkipTest: + if i >= n: + raise + else: + return + + test_inner.__doc__ = fn.__doc__ + return test_inner + + return wrapper + + +class MockIO(StringIO): + """Wraps StringIO to mock a file with no I/O""" + + def write(self, data): + return + + +def simple_progress(iterable=None, total=None, file=sys.stdout, desc='', + leave=False, miniters=1, mininterval=0.1, width=60): + """Simple progress bar reproducing tqdm's major features""" + n = [0] # use a closure + start_t = [time()] + last_n = [0] + last_t = [0] + if iterable is not None: + total = len(iterable) + + def format_interval(t): + mins, s = divmod(int(t), 60) + h, m = divmod(mins, 60) + if h: + return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s) + else: + return '{0:02d}:{1:02d}'.format(m, s) + + def update_and_print(i=1): + n[0] += i + if (n[0] - last_n[0]) >= miniters: + last_n[0] = n[0] + + if (time() - last_t[0]) >= mininterval: + last_t[0] = time() # last_t[0] == current time + + spent = last_t[0] - start_t[0] + spent_fmt = format_interval(spent) + rate = n[0] / spent if spent > 0 else 0 + if 0.0 < rate < 1.0: + rate_fmt = "%.2fs/it" % (1.0 / rate) + else: + rate_fmt = "%.2fit/s" % rate + + frac = n[0] / total + percentage = int(frac * 100) + eta = (total - n[0]) / rate if rate > 0 else 0 + eta_fmt = format_interval(eta) + + # bar = "#" * int(frac * width) + barfill = " " * int((1.0 - frac) * width) + bar_length, frac_bar_length = divmod(int(frac * width * 10), 10) + bar = '#' * bar_length + frac_bar = chr(48 + frac_bar_length) if frac_bar_length \ + else ' ' + + file.write("\r%s %i%%|%s%s%s| %i/%i [%s<%s, %s]" % + (desc, percentage, bar, frac_bar, barfill, n[0], + total, spent_fmt, eta_fmt, rate_fmt)) + + if n[0] == total and leave: + file.write("\n") + file.flush() + + def update_and_yield(): + for elt in iterable: + yield elt + update_and_print() + + update_and_print(0) + if iterable is not None: + return update_and_yield() + else: + return update_and_print + + +def assert_performance(thresh, name_left, time_left, name_right, time_right): + """raises if time_left > thresh * time_right""" + if time_left > thresh * time_right: + raise ValueError( + ('{name[0]}: {time[0]:f}, ' + '{name[1]}: {time[1]:f}, ' + 'ratio {ratio:f} > {thresh:f}').format( + name=(name_left, name_right), + time=(time_left, time_right), + ratio=time_left / time_right, thresh=thresh)) + + +@with_setup(pretest, posttest) +@retry_on_except() +def test_iter_overhead(): + """Test overhead of iteration based tqdm""" + + total = int(1e6) + + with closing(MockIO()) as our_file: + a = 0 + with trange(total, file=our_file) as t: + with relative_timer() as time_tqdm: + for i in t: + a += i + assert a == (total * total - total) / 2.0 + + a = 0 + with relative_timer() as time_bench: + for i in _range(total): + a += i + our_file.write(a) + + assert_performance(6, 'trange', time_tqdm(), 'range', time_bench()) + + +@with_setup(pretest, posttest) +@retry_on_except() +def test_manual_overhead(): + """Test overhead of manual tqdm""" + + total = int(1e6) + + with closing(MockIO()) as our_file: + with tqdm(total=total * 10, file=our_file, leave=True) as t: + a = 0 + with relative_timer() as time_tqdm: + for i in _range(total): + a += i + t.update(10) + + a = 0 + with relative_timer() as time_bench: + for i in _range(total): + a += i + our_file.write(a) + + assert_performance(6, 'tqdm', time_tqdm(), 'range', time_bench()) + + +def worker(total, blocking=True): + def incr_bar(x): + with closing(StringIO()) as our_file: + for _ in trange( + total, file=our_file, + lock_args=None if blocking else (False,), + miniters=1, mininterval=0, maxinterval=0): + pass + return x + 1 + return incr_bar + + +@with_setup(pretest, posttest) +@retry_on_except() +def test_lock_args(): + """Test overhead of nonblocking threads""" + try: + from concurrent.futures import ThreadPoolExecutor + from threading import RLock + except ImportError: + raise SkipTest + import sys + + total = 8 + subtotal = 1000 + + tqdm.set_lock(RLock()) + with ThreadPoolExecutor(total) as pool: + sys.stderr.write('block ... ') + sys.stderr.flush() + with relative_timer() as time_tqdm: + res = list(pool.map(worker(subtotal, True), range(total))) + assert sum(res) == sum(range(total)) + total + sys.stderr.write('noblock ... ') + sys.stderr.flush() + with relative_timer() as time_noblock: + res = list(pool.map(worker(subtotal, False), range(total))) + assert sum(res) == sum(range(total)) + total + + assert_performance(0.2, 'noblock', time_noblock(), 'tqdm', time_tqdm()) + + +@with_setup(pretest, posttest) +@retry_on_except() +def test_iter_overhead_hard(): + """Test overhead of iteration based tqdm (hard)""" + + total = int(1e5) + + with closing(MockIO()) as our_file: + a = 0 + with trange(total, file=our_file, leave=True, miniters=1, + mininterval=0, maxinterval=0) as t: + with relative_timer() as time_tqdm: + for i in t: + a += i + assert a == (total * total - total) / 2.0 + + a = 0 + with relative_timer() as time_bench: + for i in _range(total): + a += i + our_file.write(("%i" % a) * 40) + + assert_performance(85, 'trange', time_tqdm(), 'range', time_bench()) + + +@with_setup(pretest, posttest) +@retry_on_except() +def test_manual_overhead_hard(): + """Test overhead of manual tqdm (hard)""" + + total = int(1e5) + + with closing(MockIO()) as our_file: + t = tqdm(total=total * 10, file=our_file, leave=True, miniters=1, + mininterval=0, maxinterval=0) + a = 0 + with relative_timer() as time_tqdm: + for i in _range(total): + a += i + t.update(10) + + a = 0 + with relative_timer() as time_bench: + for i in _range(total): + a += i + our_file.write(("%i" % a) * 40) + + assert_performance(85, 'tqdm', time_tqdm(), 'range', time_bench()) + + +@with_setup(pretest, posttest) +@retry_on_except() +def test_iter_overhead_simplebar_hard(): + """Test overhead of iteration based tqdm vs simple progress bar (hard)""" + + total = int(1e4) + + with closing(MockIO()) as our_file: + a = 0 + with trange(total, file=our_file, leave=True, miniters=1, + mininterval=0, maxinterval=0) as t: + with relative_timer() as time_tqdm: + for i in t: + a += i + assert a == (total * total - total) / 2.0 + + a = 0 + s = simple_progress(_range(total), file=our_file, leave=True, + miniters=1, mininterval=0) + with relative_timer() as time_bench: + for i in s: + a += i + + assert_performance( + 5, 'trange', time_tqdm(), 'simple_progress', time_bench()) + + +@with_setup(pretest, posttest) +@retry_on_except() +def test_manual_overhead_simplebar_hard(): + """Test overhead of manual tqdm vs simple progress bar (hard)""" + + total = int(1e4) + + with closing(MockIO()) as our_file: + t = tqdm(total=total * 10, file=our_file, leave=True, miniters=1, + mininterval=0, maxinterval=0) + a = 0 + with relative_timer() as time_tqdm: + for i in _range(total): + a += i + t.update(10) + + simplebar_update = simple_progress( + total=total * 10, file=our_file, leave=True, miniters=1, + mininterval=0) + a = 0 + with relative_timer() as time_bench: + for i in _range(total): + a += i + simplebar_update(10) + + assert_performance( + 5, 'tqdm', time_tqdm(), 'simple_progress', time_bench()) diff --git a/libs/tqdm/tests/tests_synchronisation.py b/libs/tqdm/tests/tests_synchronisation.py new file mode 100644 index 000000000..34f682a58 --- /dev/null +++ b/libs/tqdm/tests/tests_synchronisation.py @@ -0,0 +1,213 @@ +from __future__ import division +from tqdm import tqdm, trange, TMonitor +from tests_tqdm import with_setup, pretest, posttest, SkipTest, \ + StringIO, closing +from tests_tqdm import DiscreteTimer, cpu_timify +from tests_perf import retry_on_except + +import sys +from time import sleep +from threading import Event + + +class FakeSleep(object): + """Wait until the discrete timer reached the required time""" + def __init__(self, dtimer): + self.dtimer = dtimer + + def sleep(self, t): + end = t + self.dtimer.t + while self.dtimer.t < end: + sleep(0.0000001) # sleep a bit to interrupt (instead of pass) + + +class FakeTqdm(object): + _instances = [] + + +def make_create_fake_sleep_event(sleep): + def wait(self, timeout=None): + if timeout is not None: + sleep(timeout) + return self.is_set() + + def create_fake_sleep_event(): + event = Event() + event.wait = wait + return event + + return create_fake_sleep_event + + +def incr(x): + return x + 1 + + +def incr_bar(x): + with closing(StringIO()) as our_file: + for _ in trange(x, lock_args=(False,), file=our_file): + pass + return incr(x) + + +@with_setup(pretest, posttest) +def test_monitor_thread(): + """Test dummy monitoring thread""" + maxinterval = 10 + + # Setup a discrete timer + timer = DiscreteTimer() + TMonitor._time = timer.time + # And a fake sleeper + sleeper = FakeSleep(timer) + TMonitor._event = make_create_fake_sleep_event(sleeper.sleep) + + # Instanciate the monitor + monitor = TMonitor(FakeTqdm, maxinterval) + # Test if alive, then killed + assert monitor.report() + monitor.exit() + timer.sleep(maxinterval * 2) # need to go out of the sleep to die + assert not monitor.report() + # assert not monitor.is_alive() # not working dunno why, thread not killed + del monitor + + +@with_setup(pretest, posttest) +def test_monitoring_and_cleanup(): + """Test for stalled tqdm instance and monitor deletion""" + # Note: should fix miniters for these tests, else with dynamic_miniters + # it's too complicated to handle with monitoring update and maxinterval... + maxinterval = 2 + + total = 1000 + # Setup a discrete timer + timer = DiscreteTimer() + # And a fake sleeper + sleeper = FakeSleep(timer) + # Setup TMonitor to use the timer + TMonitor._time = timer.time + TMonitor._event = make_create_fake_sleep_event(sleeper.sleep) + # Set monitor interval + tqdm.monitor_interval = maxinterval + with closing(StringIO()) as our_file: + with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, + maxinterval=maxinterval) as t: + cpu_timify(t, timer) + # Do a lot of iterations in a small timeframe + # (smaller than monitor interval) + timer.sleep(maxinterval / 2) # monitor won't wake up + t.update(500) + # check that our fixed miniters is still there + assert t.miniters == 500 + # Then do 1 it after monitor interval, so that monitor kicks in + timer.sleep(maxinterval * 2) + t.update(1) + # Wait for the monitor to get out of sleep's loop and update tqdm.. + timeend = timer.time() + while not (t.monitor.woken >= timeend and t.miniters == 1): + timer.sleep(1) # Force monitor to wake up if it woken too soon + sleep(0.000001) # sleep to allow interrupt (instead of pass) + assert t.miniters == 1 # check that monitor corrected miniters + # Note: at this point, there may be a race condition: monitor saved + # current woken time but timer.sleep() happen just before monitor + # sleep. To fix that, either sleep here or increase time in a loop + # to ensure that monitor wakes up at some point. + + # Try again but already at miniters = 1 so nothing will be done + timer.sleep(maxinterval * 2) + t.update(2) + timeend = timer.time() + while t.monitor.woken < timeend: + timer.sleep(1) # Force monitor to wake up if it woken too soon + sleep(0.000001) + # Wait for the monitor to get out of sleep's loop and update tqdm.. + assert t.miniters == 1 # check that monitor corrected miniters + + # Check that class var monitor is deleted if no instance left + tqdm.monitor_interval = 10 + assert tqdm.monitor is None + + +@with_setup(pretest, posttest) +def test_monitoring_multi(): + """Test on multiple bars, one not needing miniters adjustment""" + # Note: should fix miniters for these tests, else with dynamic_miniters + # it's too complicated to handle with monitoring update and maxinterval... + maxinterval = 2 + + total = 1000 + # Setup a discrete timer + timer = DiscreteTimer() + # And a fake sleeper + sleeper = FakeSleep(timer) + # Setup TMonitor to use the timer + TMonitor._time = timer.time + TMonitor._event = make_create_fake_sleep_event(sleeper.sleep) + # Set monitor interval + tqdm.monitor_interval = maxinterval + with closing(StringIO()) as our_file: + with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, + maxinterval=maxinterval) as t1: + # Set high maxinterval for t2 so monitor does not need to adjust it + with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, + maxinterval=1E5) as t2: + cpu_timify(t1, timer) + cpu_timify(t2, timer) + # Do a lot of iterations in a small timeframe + timer.sleep(maxinterval / 2) + t1.update(500) + t2.update(500) + assert t1.miniters == 500 + assert t2.miniters == 500 + # Then do 1 it after monitor interval, so that monitor kicks in + timer.sleep(maxinterval * 2) + t1.update(1) + t2.update(1) + # Wait for the monitor to get out of sleep and update tqdm + timeend = timer.time() + while not (t1.monitor.woken >= timeend and t1.miniters == 1): + timer.sleep(1) + sleep(0.000001) + assert t1.miniters == 1 # check that monitor corrected miniters + assert t2.miniters == 500 # check that t2 was not adjusted + + # Check that class var monitor is deleted if no instance left + tqdm.monitor_interval = 10 + assert tqdm.monitor is None + + +@with_setup(pretest, posttest) +def test_imap(): + """Test multiprocessing.Pool""" + try: + from multiprocessing import Pool + except ImportError: + raise SkipTest + + pool = Pool() + res = list(tqdm(pool.imap(incr, range(100)), disable=True)) + assert res[-1] == 100 + + +# py2: locks won't propagate to incr_bar so may cause `AttributeError` +@retry_on_except(n=3 if sys.version_info < (3,) else 1) +@with_setup(pretest, posttest) +def test_threadpool(): + """Test concurrent.futures.ThreadPoolExecutor""" + try: + from concurrent.futures import ThreadPoolExecutor + from threading import RLock + except ImportError: + raise SkipTest + + tqdm.set_lock(RLock()) + with ThreadPoolExecutor(8) as pool: + try: + res = list(tqdm(pool.map(incr_bar, range(100)), disable=True)) + except AttributeError: + if sys.version_info < (3,): + raise SkipTest + else: + raise + assert sum(res) == sum(range(1, 101)) diff --git a/libs/tqdm/tests/tests_tqdm.py b/libs/tqdm/tests/tests_tqdm.py new file mode 100644 index 000000000..5f322e61a --- /dev/null +++ b/libs/tqdm/tests/tests_tqdm.py @@ -0,0 +1,1966 @@ +# -*- coding: utf-8 -*- +# Advice: use repr(our_file.read()) to print the full output of tqdm +# (else '\r' will replace the previous lines and you'll see only the latest. + +import sys +import csv +import re +import os +from nose import with_setup +from nose.plugins.skip import SkipTest +from nose.tools import assert_raises +from nose.tools import eq_ +from contextlib import contextmanager +from warnings import catch_warnings, simplefilter + +from tqdm import tqdm +from tqdm import trange +from tqdm import TqdmDeprecationWarning +from tqdm.std import Bar +from tqdm.contrib import DummyTqdmFile + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +from io import BytesIO +from io import IOBase # to support unicode strings + + +class DeprecationError(Exception): + pass + + +# Ensure we can use `with closing(...) as ... :` syntax +if getattr(StringIO, '__exit__', False) and \ + getattr(StringIO, '__enter__', False): + def closing(arg): + return arg +else: + from contextlib import closing + +try: + _range = xrange +except NameError: + _range = range + +try: + _unicode = unicode +except NameError: + _unicode = str + +nt_and_no_colorama = False +if os.name == 'nt': + try: + import colorama # NOQA + except ImportError: + nt_and_no_colorama = True + +# Regex definitions +# List of control characters +CTRLCHR = [r'\r', r'\n', r'\x1b\[A'] # Need to escape [ for regex +# Regular expressions compilation +RE_rate = re.compile(r'(\d+\.\d+)it/s') +RE_ctrlchr = re.compile("(%s)" % '|'.join(CTRLCHR)) # Match control chars +RE_ctrlchr_excl = re.compile('|'.join(CTRLCHR)) # Match and exclude ctrl chars +RE_pos = re.compile( + r'([\r\n]+((pos\d+) bar:\s+\d+%|\s{3,6})?[^\r\n]*)') + + +def pos_line_diff(res_list, expected_list, raise_nonempty=True): + """ + Return differences between two bar output lists. + To be used with `RE_pos` + """ + res = [(r, e) for r, e in zip(res_list, expected_list) + for pos in [len(e) - len(e.lstrip('\n'))] # bar position + if r != e # simple comparison + if not r.startswith(e) # start matches + or not ( + # move up at end (maybe less due to closing bars) + any(r.endswith(end + i * '\x1b[A') for i in range(pos + 1) + for end in [ + ']', # bar + ' ']) # cleared + or '100%' in r # completed bar + or r == '\n') # final bar + or r[(-1 - pos) * len('\x1b[A'):] == '\x1b[A'] # too many moves up + if raise_nonempty and (res or len(res_list) != len(expected_list)): + if len(res_list) < len(expected_list): + res.extend([(None, e) for e in expected_list[len(res_list):]]) + elif len(res_list) > len(expected_list): + res.extend([(r, None) for r in res_list[len(expected_list):]]) + raise AssertionError( + "Got => Expected\n" + '\n'.join('%r => %r' % i for i in res)) + return res + + +class DiscreteTimer(object): + """Virtual discrete time manager, to precisely control time for tests""" + + def __init__(self): + self.t = 0.0 + + def sleep(self, t): + """Sleep = increment the time counter (almost no CPU used)""" + self.t += t + + def time(self): + """Get the current time""" + return self.t + + +def cpu_timify(t, timer=None): + """Force tqdm to use the specified timer instead of system-wide time()""" + if timer is None: + timer = DiscreteTimer() + t._time = timer.time + t._sleep = timer.sleep + t.start_t = t.last_print_t = t._time() + return timer + + +def pretest(): + # setcheckinterval is deprecated + try: + sys.setswitchinterval(1) + except AttributeError: + sys.setcheckinterval(100) + + if getattr(tqdm, "_instances", False): + n = len(tqdm._instances) + if n: + tqdm._instances.clear() + raise EnvironmentError( + "{0} `tqdm` instances still in existence PRE-test".format(n)) + + +def posttest(): + if getattr(tqdm, "_instances", False): + n = len(tqdm._instances) + if n: + tqdm._instances.clear() + raise EnvironmentError( + "{0} `tqdm` instances still in existence POST-test".format(n)) + + +class UnicodeIO(IOBase): + """Unicode version of StringIO""" + + def __init__(self, *args, **kwargs): + super(UnicodeIO, self).__init__(*args, **kwargs) + self.encoding = 'U8' # io.StringIO supports unicode, but no encoding + self.text = '' + self.cursor = 0 + + def __len__(self): + return len(self.text) + + def seek(self, offset): + self.cursor = offset + + def tell(self): + return self.cursor + + def write(self, s): + self.text = self.text[:self.cursor] + s + \ + self.text[self.cursor + len(s):] + self.cursor += len(s) + + def read(self, n=-1): + _cur = self.cursor + self.cursor = len(self) if n < 0 \ + else min(_cur + n, len(self)) + return self.text[_cur:self.cursor] + + def getvalue(self): + return self.text + + +def get_bar(all_bars, i=None): + """Get a specific update from a whole bar traceback""" + # Split according to any used control characters + bars_split = RE_ctrlchr_excl.split(all_bars) + bars_split = list(filter(None, bars_split)) # filter out empty splits + return bars_split if i is None else bars_split[i] + + +def progressbar_rate(bar_str): + return float(RE_rate.search(bar_str).group(1)) + + +def squash_ctrlchars(s): + """Apply control characters in a string just like a terminal display""" + # Init variables + curline = 0 # current line in our fake terminal + lines = [''] # state of our fake terminal + + # Split input string by control codes + s_split = RE_ctrlchr.split(s) + s_split = filter(None, s_split) # filter out empty splits + + # For each control character or message + for nextctrl in s_split: + # If it's a control character, apply it + if nextctrl == '\r': + # Carriage return + # Go to the beginning of the line + # simplified here: we just empty the string + lines[curline] = '' + elif nextctrl == '\n': + # Newline + # Go to the next line + if curline < (len(lines) - 1): + # If already exists, just move cursor + curline += 1 + else: + # Else the new line is created + lines.append('') + curline += 1 + elif nextctrl == '\x1b[A': + # Move cursor up + if curline > 0: + curline -= 1 + else: + raise ValueError("Cannot go up, anymore!") + # Else, it is a message, we print it on current line + else: + lines[curline] += nextctrl + + return lines + + +def test_format_interval(): + """Test time interval format""" + format_interval = tqdm.format_interval + + assert format_interval(60) == '01:00' + assert format_interval(6160) == '1:42:40' + assert format_interval(238113) == '66:08:33' + + +def test_format_num(): + """Test number format""" + format_num = tqdm.format_num + + assert float(format_num(1337)) == 1337 + assert format_num(int(1e6)) == '1e+6' + assert format_num(1239876) == '1''239''876' + + +def test_format_meter(): + """Test statistics and progress bar formatting""" + try: + unich = unichr + except NameError: + unich = chr + + format_meter = tqdm.format_meter + + assert format_meter(0, 1000, 13) == \ + " 0%| | 0/1000 [00:13<?, ?it/s]" + # If not implementing any changes to _tqdm.py, set prefix='desc' + # or else ": : " will be in output, so assertion should change + assert format_meter(0, 1000, 13, ncols=68, prefix='desc: ') == \ + "desc: 0%| | 0/1000 [00:13<?, ?it/s]" + assert format_meter(231, 1000, 392) == \ + " 23%|" + unich(0x2588) * 2 + unich(0x258e) + \ + " | 231/1000 [06:32<21:44, 1.70s/it]" + assert format_meter(10000, 1000, 13) == \ + "10000it [00:13, 769.23it/s]" + assert format_meter(231, 1000, 392, ncols=56, ascii=True) == \ + " 23%|" + '#' * 3 + '6' + \ + " | 231/1000 [06:32<21:44, 1.70s/it]" + assert format_meter(100000, 1000, 13, unit_scale=True, unit='iB') == \ + "100kiB [00:13, 7.69kiB/s]" + assert format_meter(100, 1000, 12, ncols=0, rate=7.33) == \ + " 10% 100/1000 [00:12<02:02, 7.33it/s]" + eq_( + # ncols is small, l_bar is too large + # l_bar gets chopped + # no bar + # no r_bar + format_meter( + 0, 1000, 13, ncols=10, + bar_format="************{bar:10}$$$$$$$$$$"), + "**********" # 10/12 stars since ncols is 10 + ) + eq_( + # n_cols allows for l_bar and some of bar + # l_bar displays + # bar gets chopped + # no r_bar + format_meter( + 0, 1000, 13, ncols=20, + bar_format="************{bar:10}$$$$$$$$$$"), + "************ " # all 12 stars and 8/10 bar parts + ) + eq_( + # n_cols allows for l_bar, bar, and some of r_bar + # l_bar displays + # bar displays + # r_bar gets chopped + format_meter( + 0, 1000, 13, ncols=30, + bar_format="************{bar:10}$$$$$$$$$$"), + "************ $$$$$$$$" + # all 12 stars and 10 bar parts, but only 8/10 dollar signs + ) + eq_( + # trim left ANSI; escape is before trim zone + format_meter( + 0, 1000, 13, ncols=10, + bar_format="*****\033[22m****\033[0m***{bar:10}$$$$$$$$$$"), + "*****\033[22m****\033[0m*\033[0m" + # we only know it has ANSI codes, so we append an END code anyway + ) + eq_( + # trim left ANSI; escape is at trim zone + format_meter( + 0, 1000, 13, ncols=10, + bar_format="*****\033[22m*****\033[0m**{bar:10}$$$$$$$$$$"), + "*****\033[22m*****\033[0m" + ) + eq_( + # trim left ANSI; escape is after trim zone + format_meter( + 0, 1000, 13, ncols=10, + bar_format="*****\033[22m******\033[0m*{bar:10}$$$$$$$$$$"), + "*****\033[22m*****\033[0m" + ) + # Check that bar_format correctly adapts {bar} size to the rest + assert format_meter(20, 100, 12, ncols=13, rate=8.1, + bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == \ + " 20%|" + unich(0x258f) + "|20/100" + assert format_meter(20, 100, 12, ncols=14, rate=8.1, + bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == \ + " 20%|" + unich(0x258d) + " |20/100" + # Check wide characters + if sys.version_info >= (3,): + assert format_meter(0, 1000, 13, ncols=68, prefix='fullwidth: ') == \ + "fullwidth: 0%| | 0/1000 [00:13<?, ?it/s]" + assert format_meter(0, 1000, 13, ncols=68, prefix='ニッポン [ニッポン]: ') == \ + "ニッポン [ニッポン]: 0%| | 0/1000 [00:13<?, ?it/s]" + # Check that bar_format can print only {bar} or just one side + assert format_meter(20, 100, 12, ncols=2, rate=8.1, + bar_format=r'{bar}') == \ + unich(0x258d) + " " + assert format_meter(20, 100, 12, ncols=7, rate=8.1, + bar_format=r'{l_bar}{bar}') == \ + " 20%|" + unich(0x258d) + " " + assert format_meter(20, 100, 12, ncols=6, rate=8.1, + bar_format=r'{bar}|test') == \ + unich(0x258f) + "|test" + + +def test_ansi_escape_codes(): + """Test stripping of ANSI escape codes""" + ansi = dict(BOLD='\033[1m', RED='\033[91m', END='\033[0m') + desc_raw = '{BOLD}{RED}Colored{END} description' + ncols = 123 + + desc_stripped = desc_raw.format(BOLD='', RED='', END='') + meter = tqdm.format_meter(0, 100, 0, ncols=ncols, prefix=desc_stripped) + assert len(meter) == ncols + + desc = desc_raw.format(**ansi) + meter = tqdm.format_meter(0, 100, 0, ncols=ncols, prefix=desc) + # `format_meter` inserts an extra END for safety + ansi_len = len(desc) - len(desc_stripped) + len(ansi['END']) + assert len(meter) == ncols + ansi_len + + +def test_si_format(): + """Test SI unit prefixes""" + format_meter = tqdm.format_meter + + assert '9.00 ' in format_meter(1, 9, 1, unit_scale=True, unit='B') + assert '99.0 ' in format_meter(1, 99, 1, unit_scale=True) + assert '999 ' in format_meter(1, 999, 1, unit_scale=True) + assert '9.99k ' in format_meter(1, 9994, 1, unit_scale=True) + assert '10.0k ' in format_meter(1, 9999, 1, unit_scale=True) + assert '99.5k ' in format_meter(1, 99499, 1, unit_scale=True) + assert '100k ' in format_meter(1, 99999, 1, unit_scale=True) + assert '1.00M ' in format_meter(1, 999999, 1, unit_scale=True) + assert '1.00G ' in format_meter(1, 999999999, 1, unit_scale=True) + assert '1.00T ' in format_meter(1, 999999999999, 1, unit_scale=True) + assert '1.00P ' in format_meter(1, 999999999999999, 1, unit_scale=True) + assert '1.00E ' in format_meter(1, 999999999999999999, 1, unit_scale=True) + assert '1.00Z ' in format_meter(1, 999999999999999999999, 1, + unit_scale=True) + assert '1.0Y ' in format_meter(1, 999999999999999999999999, 1, + unit_scale=True) + assert '10.0Y ' in format_meter(1, 9999999999999999999999999, 1, + unit_scale=True) + assert '100.0Y ' in format_meter(1, 99999999999999999999999999, 1, + unit_scale=True) + assert '1000.0Y ' in format_meter(1, 999999999999999999999999999, 1, + unit_scale=True) + + +def test_bar_formatspec(): + """Test Bar.__format__ spec""" + assert "{0:5a}".format(Bar(0.3)) == "#5 " + assert "{0:2}".format(Bar(0.5, charset=" .oO0")) == "0 " + assert "{0:2a}".format(Bar(0.5, charset=" .oO0")) == "# " + assert "{0:-6a}".format(Bar(0.5, 10)) == '## ' + assert "{0:2b}".format(Bar(0.5, 10)) == ' ' + + +@with_setup(pretest, posttest) +def test_all_defaults(): + """Test default kwargs""" + with closing(UnicodeIO()) as our_file: + with tqdm(range(10), file=our_file) as progressbar: + assert len(progressbar) == 10 + for _ in progressbar: + pass + # restore stdout/stderr output for `nosetest` interface + # try: + # sys.stderr.write('\x1b[A') + # except: + # pass + sys.stderr.write('\rTest default kwargs ... ') + + +class WriteTypeChecker(BytesIO): + """File-like to assert the expected type is written""" + def __init__(self, expected_type): + super(WriteTypeChecker, self).__init__() + self.expected_type = expected_type + + def write(self, s): + assert isinstance(s, self.expected_type) + + +@with_setup(pretest, posttest) +def test_native_string_io_for_default_file(): + """Native strings written to unspecified files""" + stderr = sys.stderr + try: + sys.stderr = WriteTypeChecker(expected_type=type('')) + for _ in tqdm(range(3)): + pass + sys.stderr.encoding = None # py2 behaviour + for _ in tqdm(range(3)): + pass + finally: + sys.stderr = stderr + + +@with_setup(pretest, posttest) +def test_unicode_string_io_for_specified_file(): + """Unicode strings written to specified files""" + for _ in tqdm(range(3), file=WriteTypeChecker(expected_type=type(u''))): + pass + + +@with_setup(pretest, posttest) +def test_write_bytes(): + """Test write_bytes argument with and without `file`""" + # specified file (and bytes) + for _ in tqdm(range(3), file=WriteTypeChecker(expected_type=type(b'')), + write_bytes=True): + pass + # unspecified file (and unicode) + stderr = sys.stderr + try: + sys.stderr = WriteTypeChecker(expected_type=type(u'')) + for _ in tqdm(range(3), write_bytes=False): + pass + finally: + sys.stderr = stderr + + +@with_setup(pretest, posttest) +def test_iterate_over_csv_rows(): + """Test csv iterator""" + # Create a test csv pseudo file + with closing(StringIO()) as test_csv_file: + writer = csv.writer(test_csv_file) + for _ in _range(3): + writer.writerow(['test'] * 3) + test_csv_file.seek(0) + + # Test that nothing fails if we iterate over rows + reader = csv.DictReader(test_csv_file, + fieldnames=('row1', 'row2', 'row3')) + with closing(StringIO()) as our_file: + for _ in tqdm(reader, file=our_file): + pass + + +@with_setup(pretest, posttest) +def test_file_output(): + """Test output to arbitrary file-like objects""" + with closing(StringIO()) as our_file: + for i in tqdm(_range(3), file=our_file): + if i == 1: + our_file.seek(0) + assert '0/3' in our_file.read() + + +@with_setup(pretest, posttest) +def test_leave_option(): + """Test `leave=True` always prints info about the last iteration""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, leave=True): + pass + res = our_file.getvalue() + assert '| 3/3 ' in res + assert '\n' == res[-1] # not '\r' + + with closing(StringIO()) as our_file2: + for _ in tqdm(_range(3), file=our_file2, leave=False): + pass + assert '| 3/3 ' not in our_file2.getvalue() + + +@with_setup(pretest, posttest) +def test_trange(): + """Test trange""" + with closing(StringIO()) as our_file: + for _ in trange(3, file=our_file, leave=True): + pass + assert '| 3/3 ' in our_file.getvalue() + + with closing(StringIO()) as our_file2: + for _ in trange(3, file=our_file2, leave=False): + pass + assert '| 3/3 ' not in our_file2.getvalue() + + +@with_setup(pretest, posttest) +def test_min_interval(): + """Test mininterval""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, mininterval=1e-10): + pass + assert " 0%| | 0/3 [00:00<" in our_file.getvalue() + + +@with_setup(pretest, posttest) +def test_max_interval(): + """Test maxinterval""" + total = 100 + bigstep = 10 + smallstep = 5 + + # Test without maxinterval + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + with closing(StringIO()) as our_file2: + # with maxinterval but higher than loop sleep time + t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, + smoothing=1, maxinterval=1e-2) + cpu_timify(t, timer) + + # without maxinterval + t2 = tqdm(total=total, file=our_file2, miniters=None, mininterval=0, + smoothing=1, maxinterval=None) + cpu_timify(t2, timer) + + assert t.dynamic_miniters + assert t2.dynamic_miniters + + # Increase 10 iterations at once + t.update(bigstep) + t2.update(bigstep) + # The next iterations should not trigger maxinterval (step 10) + for _ in _range(4): + t.update(smallstep) + t2.update(smallstep) + timer.sleep(1e-5) + t.close() # because PyPy doesn't gc immediately + t2.close() # as above + + assert "25%" not in our_file2.getvalue() + assert "25%" not in our_file.getvalue() + + # Test with maxinterval effect + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + with tqdm(total=total, file=our_file, miniters=None, mininterval=0, + smoothing=1, maxinterval=1e-4) as t: + cpu_timify(t, timer) + + # Increase 10 iterations at once + t.update(bigstep) + # The next iterations should trigger maxinterval (step 5) + for _ in _range(4): + t.update(smallstep) + timer.sleep(1e-2) + + assert "25%" in our_file.getvalue() + + # Test iteration based tqdm with maxinterval effect + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + with tqdm(_range(total), file=our_file, miniters=None, + mininterval=1e-5, smoothing=1, maxinterval=1e-4) as t2: + cpu_timify(t2, timer) + + for i in t2: + if i >= (bigstep - 1) and \ + ((i - (bigstep - 1)) % smallstep) == 0: + timer.sleep(1e-2) + if i >= 3 * bigstep: + break + + assert "15%" in our_file.getvalue() + + # Test different behavior with and without mininterval + timer = DiscreteTimer() + total = 1000 + mininterval = 0.1 + maxinterval = 10 + with closing(StringIO()) as our_file: + with tqdm(total=total, file=our_file, miniters=None, smoothing=1, + mininterval=mininterval, maxinterval=maxinterval) as tm1: + with tqdm(total=total, file=our_file, miniters=None, smoothing=1, + mininterval=0, maxinterval=maxinterval) as tm2: + + cpu_timify(tm1, timer) + cpu_timify(tm2, timer) + + # Fast iterations, check if dynamic_miniters triggers + timer.sleep(mininterval) # to force update for t1 + tm1.update(total / 2) + tm2.update(total / 2) + assert int(tm1.miniters) == tm2.miniters == total / 2 + + # Slow iterations, check different miniters if mininterval + timer.sleep(maxinterval * 2) + tm1.update(total / 2) + tm2.update(total / 2) + res = [tm1.miniters, tm2.miniters] + assert res == [(total / 2) * mininterval / (maxinterval * 2), + (total / 2) * maxinterval / (maxinterval * 2)] + + # Same with iterable based tqdm + timer1 = DiscreteTimer() # need 2 timers for each bar because zip not work + timer2 = DiscreteTimer() + total = 100 + mininterval = 0.1 + maxinterval = 10 + with closing(StringIO()) as our_file: + t1 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, + mininterval=mininterval, maxinterval=maxinterval) + t2 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, + mininterval=0, maxinterval=maxinterval) + + cpu_timify(t1, timer1) + cpu_timify(t2, timer2) + + for i in t1: + if i == ((total / 2) - 2): + timer1.sleep(mininterval) + if i == (total - 1): + timer1.sleep(maxinterval * 2) + + for i in t2: + if i == ((total / 2) - 2): + timer2.sleep(mininterval) + if i == (total - 1): + timer2.sleep(maxinterval * 2) + + assert t1.miniters == 0.255 + assert t2.miniters == 0.5 + + t1.close() + t2.close() + + +@with_setup(pretest, posttest) +def test_min_iters(): + """Test miniters""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, leave=True, miniters=4): + our_file.write('blank\n') + assert '\nblank\nblank\n' in our_file.getvalue() + + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, leave=True, miniters=1): + our_file.write('blank\n') + # assume automatic mininterval = 0 means intermediate output + assert '| 3/3 ' in our_file.getvalue() + + +@with_setup(pretest, posttest) +def test_dynamic_min_iters(): + """Test purely dynamic miniters (and manual updates and __del__)""" + with closing(StringIO()) as our_file: + total = 10 + t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, + smoothing=1) + + t.update() + # Increase 3 iterations + t.update(3) + # The next two iterations should be skipped because of dynamic_miniters + t.update() + t.update() + # The third iteration should be displayed + t.update() + + out = our_file.getvalue() + assert t.dynamic_miniters + t.__del__() # simulate immediate del gc + + assert ' 0%| | 0/10 [00:00<' in out + assert '40%' in out + assert '50%' not in out + assert '60%' not in out + assert '70%' in out + + # Check with smoothing=0, miniters should be set to max update seen so far + with closing(StringIO()) as our_file: + total = 10 + t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, + smoothing=0) + + t.update() + t.update(2) + t.update(5) # this should be stored as miniters + t.update(1) + + out = our_file.getvalue() + assert all(i in out for i in ("0/10", "1/10", "3/10")) + assert "2/10" not in out + assert t.dynamic_miniters and not t.smoothing + assert t.miniters == 5 + t.close() + + # Check iterable based tqdm + with closing(StringIO()) as our_file: + t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, + smoothing=0.5) + for _ in t: + pass + assert t.dynamic_miniters + + # No smoothing + with closing(StringIO()) as our_file: + t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, + smoothing=0) + for _ in t: + pass + assert t.dynamic_miniters + + # No dynamic_miniters (miniters is fixed manually) + with closing(StringIO()) as our_file: + t = tqdm(_range(10), file=our_file, miniters=1, mininterval=None) + for _ in t: + pass + assert not t.dynamic_miniters + + +@with_setup(pretest, posttest) +def test_big_min_interval(): + """Test large mininterval""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(2), file=our_file, mininterval=1E10): + pass + assert '50%' not in our_file.getvalue() + + with closing(StringIO()) as our_file: + with tqdm(_range(2), file=our_file, mininterval=1E10) as t: + t.update() + t.update() + assert '50%' not in our_file.getvalue() + + +@with_setup(pretest, posttest) +def test_smoothed_dynamic_min_iters(): + """Test smoothed dynamic miniters""" + timer = DiscreteTimer() + + with closing(StringIO()) as our_file: + with tqdm(total=100, file=our_file, miniters=None, mininterval=0, + smoothing=0.5, maxinterval=0) as t: + cpu_timify(t, timer) + + # Increase 10 iterations at once + t.update(10) + # The next iterations should be partially skipped + for _ in _range(2): + t.update(4) + for _ in _range(20): + t.update() + + out = our_file.getvalue() + assert t.dynamic_miniters + assert ' 0%| | 0/100 [00:00<' in out + assert '10%' in out + assert '14%' not in out + assert '18%' in out + assert '20%' not in out + assert '25%' in out + assert '30%' not in out + assert '32%' in out + + +@with_setup(pretest, posttest) +def test_smoothed_dynamic_min_iters_with_min_interval(): + """Test smoothed dynamic miniters with mininterval""" + timer = DiscreteTimer() + + # In this test, `miniters` should gradually decline + total = 100 + + with closing(StringIO()) as our_file: + # Test manual updating tqdm + with tqdm(total=total, file=our_file, miniters=None, mininterval=1e-3, + smoothing=1, maxinterval=0) as t: + cpu_timify(t, timer) + + t.update(10) + timer.sleep(1e-2) + for _ in _range(4): + t.update() + timer.sleep(1e-2) + out = our_file.getvalue() + assert t.dynamic_miniters + + with closing(StringIO()) as our_file: + # Test iteration-based tqdm + with tqdm(_range(total), file=our_file, miniters=None, + mininterval=0.01, smoothing=1, maxinterval=0) as t2: + cpu_timify(t2, timer) + + for i in t2: + if i >= 10: + timer.sleep(0.1) + if i >= 14: + break + out2 = our_file.getvalue() + + assert t.dynamic_miniters + assert ' 0%| | 0/100 [00:00<' in out + assert '11%' in out and '11%' in out2 + # assert '12%' not in out and '12%' in out2 + assert '13%' in out and '13%' in out2 + assert '14%' in out and '14%' in out2 + + +@with_setup(pretest, posttest) +def test_rlock_creation(): + """Test that importing tqdm does not create multiprocessing objects.""" + import multiprocessing as mp + if sys.version_info < (3, 3): + # unittest.mock is a 3.3+ feature + raise SkipTest + + # Use 'spawn' instead of 'fork' so that the process does not inherit any + # globals that have been constructed by running other tests + ctx = mp.get_context('spawn') + with ctx.Pool(1) as pool: + # The pool will propagate the error if the target method fails + pool.apply(_rlock_creation_target) + + +def _rlock_creation_target(): + """Check that the RLock has not been constructed.""" + from unittest.mock import patch + import multiprocessing as mp + + # Patch the RLock class/method but use the original implementation + with patch('multiprocessing.RLock', wraps=mp.RLock) as rlock_mock: + # Importing the module should not create a lock + from tqdm import tqdm + assert rlock_mock.call_count == 0 + # Creating a progress bar should initialize the lock + with closing(StringIO()) as our_file: + with tqdm(file=our_file) as _: # NOQA + pass + assert rlock_mock.call_count == 1 + # Creating a progress bar again should reuse the lock + with closing(StringIO()) as our_file: + with tqdm(file=our_file) as _: # NOQA + pass + assert rlock_mock.call_count == 1 + + +@with_setup(pretest, posttest) +def test_disable(): + """Test disable""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, disable=True): + pass + assert our_file.getvalue() == '' + + with closing(StringIO()) as our_file: + progressbar = tqdm(total=3, file=our_file, miniters=1, disable=True) + progressbar.update(3) + progressbar.close() + assert our_file.getvalue() == '' + + +@with_setup(pretest, posttest) +def test_infinite_total(): + """Test treatment of infinite total""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, total=float("inf")): + pass + + +@with_setup(pretest, posttest) +def test_nototal(): + """Test unknown total length""" + with closing(StringIO()) as our_file: + for i in tqdm((i for i in range(10)), file=our_file, unit_scale=10): + pass + assert "100it" in our_file.getvalue() + + with closing(StringIO()) as our_file: + for i in tqdm((i for i in range(10)), file=our_file, + bar_format="{l_bar}{bar}{r_bar}"): + pass + assert "10/?" in our_file.getvalue() + + +@with_setup(pretest, posttest) +def test_unit(): + """Test SI unit prefix""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, miniters=1, unit="bytes"): + pass + assert 'bytes/s' in our_file.getvalue() + + +@with_setup(pretest, posttest) +def test_ascii(): + """Test ascii/unicode bar""" + # Test ascii autodetection + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, ascii=None) as t: + assert t.ascii # TODO: this may fail in the future + + # Test ascii bar + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), total=15, file=our_file, miniters=1, + mininterval=0, ascii=True): + pass + res = our_file.getvalue().strip("\r").split("\r") + assert '7%|6' in res[1] + assert '13%|#3' in res[2] + assert '20%|##' in res[3] + + # Test unicode bar + with closing(UnicodeIO()) as our_file: + with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t: + for _ in _range(3): + t.update() + res = our_file.getvalue().strip("\r").split("\r") + assert u"7%|\u258b" in res[1] + assert u"13%|\u2588\u258e" in res[2] + assert u"20%|\u2588\u2588" in res[3] + + # Test custom bar + for ascii in [" .oO0", " #"]: + with closing(StringIO()) as our_file: + for _ in tqdm(_range(len(ascii) - 1), file=our_file, miniters=1, + mininterval=0, ascii=ascii, ncols=27): + pass + res = our_file.getvalue().strip("\r").split("\r") + for bar, line in zip(ascii, res): + assert '|' + bar + '|' in line + + +@with_setup(pretest, posttest) +def test_update(): + """Test manual creation and updates""" + res = None + with closing(StringIO()) as our_file: + with tqdm(total=2, file=our_file, miniters=1, mininterval=0) \ + as progressbar: + assert len(progressbar) == 2 + progressbar.update(2) + assert '| 2/2' in our_file.getvalue() + progressbar.desc = 'dynamically notify of 4 increments in total' + progressbar.total = 4 + progressbar.update(-1) + progressbar.update(2) + res = our_file.getvalue() + assert '| 3/4 ' in res + assert 'dynamically notify of 4 increments in total' in res + + +@with_setup(pretest, posttest) +def test_close(): + """Test manual creation and closure and n_instances""" + + # With `leave` option + with closing(StringIO()) as our_file: + progressbar = tqdm(total=3, file=our_file, miniters=10) + progressbar.update(3) + assert '| 3/3 ' not in our_file.getvalue() # Should be blank + assert len(tqdm._instances) == 1 + progressbar.close() + assert len(tqdm._instances) == 0 + assert '| 3/3 ' in our_file.getvalue() + + # Without `leave` option + with closing(StringIO()) as our_file: + progressbar = tqdm(total=3, file=our_file, miniters=10, leave=False) + progressbar.update(3) + progressbar.close() + assert '| 3/3 ' not in our_file.getvalue() # Should be blank + + # With all updates + with closing(StringIO()) as our_file: + assert len(tqdm._instances) == 0 + with tqdm(total=3, file=our_file, miniters=0, mininterval=0, + leave=True) as progressbar: + assert len(tqdm._instances) == 1 + progressbar.update(3) + res = our_file.getvalue() + assert '| 3/3 ' in res # Should be blank + assert '\n' not in res + # close() called + assert len(tqdm._instances) == 0 + + exres = res.rsplit(', ', 1)[0] + res = our_file.getvalue() + assert res[-1] == '\n' + if not res.startswith(exres): + raise AssertionError( + "\n<<< Expected:\n{0}\n>>> Got:\n{1}\n===".format( + exres + ', ...it/s]\n', our_file.getvalue())) + + # Closing after the output stream has closed + with closing(StringIO()) as our_file: + t = tqdm(total=2, file=our_file) + t.update() + t.update() + t.close() + + +@with_setup(pretest, posttest) +def test_smoothing(): + """Test exponential weighted average smoothing""" + timer = DiscreteTimer() + + # -- Test disabling smoothing + with closing(StringIO()) as our_file: + with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t: + cpu_timify(t, timer) + + for _ in t: + pass + assert '| 3/3 ' in our_file.getvalue() + + # -- Test smoothing + # Compile the regex to find the rate + # 1st case: no smoothing (only use average) + with closing(StringIO()) as our_file2: + with closing(StringIO()) as our_file: + t = tqdm(_range(3), file=our_file2, smoothing=None, leave=True, + miniters=1, mininterval=0) + cpu_timify(t, timer) + + with tqdm(_range(3), file=our_file, smoothing=None, leave=True, + miniters=1, mininterval=0) as t2: + cpu_timify(t2, timer) + + for i in t2: + # Sleep more for first iteration and + # see how quickly rate is updated + if i == 0: + timer.sleep(0.01) + else: + # Need to sleep in all iterations + # to calculate smoothed rate + # (else delta_t is 0!) + timer.sleep(0.001) + t.update() + n_old = len(tqdm._instances) + t.close() + assert len(tqdm._instances) == n_old - 1 + # Get result for iter-based bar + a = progressbar_rate(get_bar(our_file.getvalue(), 3)) + # Get result for manually updated bar + a2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) + + # 2nd case: use max smoothing (= instant rate) + with closing(StringIO()) as our_file2: + with closing(StringIO()) as our_file: + t = tqdm(_range(3), file=our_file2, smoothing=1, leave=True, + miniters=1, mininterval=0) + cpu_timify(t, timer) + + with tqdm(_range(3), file=our_file, smoothing=1, leave=True, + miniters=1, mininterval=0) as t2: + cpu_timify(t2, timer) + + for i in t2: + if i == 0: + timer.sleep(0.01) + else: + timer.sleep(0.001) + t.update() + t.close() + # Get result for iter-based bar + b = progressbar_rate(get_bar(our_file.getvalue(), 3)) + # Get result for manually updated bar + b2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) + + # 3rd case: use medium smoothing + with closing(StringIO()) as our_file2: + with closing(StringIO()) as our_file: + t = tqdm(_range(3), file=our_file2, smoothing=0.5, leave=True, + miniters=1, mininterval=0) + cpu_timify(t, timer) + + t2 = tqdm(_range(3), file=our_file, smoothing=0.5, leave=True, + miniters=1, mininterval=0) + cpu_timify(t2, timer) + + for i in t2: + if i == 0: + timer.sleep(0.01) + else: + timer.sleep(0.001) + t.update() + t2.close() + t.close() + # Get result for iter-based bar + c = progressbar_rate(get_bar(our_file.getvalue(), 3)) + # Get result for manually updated bar + c2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) + + # Check that medium smoothing's rate is between no and max smoothing rates + assert a <= c <= b + assert a2 <= c2 <= b2 + + +@with_setup(pretest, posttest) +def test_deprecated_nested(): + """Test nested progress bars""" + if nt_and_no_colorama: + raise SkipTest + # TODO: test degradation on windows without colorama? + + # Artificially test nested loop printing + # Without leave + our_file = StringIO() + try: + tqdm(total=2, file=our_file, nested=True) + except TqdmDeprecationWarning: + if """`nested` is deprecated and automated. +Use `position` instead for manual control.""" not in our_file.getvalue(): + raise + else: + raise DeprecationError("Should not allow nested kwarg") + + +@with_setup(pretest, posttest) +def test_bar_format(): + """Test custom bar formatting""" + with closing(StringIO()) as our_file: + bar_format = r'{l_bar}{bar}|{n_fmt}/{total_fmt}-{n}/{total}{percentage}{rate}{rate_fmt}{elapsed}{remaining}' # NOQA + for _ in trange(2, file=our_file, leave=True, bar_format=bar_format): + pass + out = our_file.getvalue() + assert "\r 0%| |0/2-0/20.0None?it/s00:00?\r" in out + + # Test unicode string auto conversion + with closing(StringIO()) as our_file: + bar_format = r'hello world' + with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t: + assert isinstance(t.bar_format, _unicode) + + +@with_setup(pretest, posttest) +def test_custom_format(): + """Test adding additional derived format arguments""" + class TqdmExtraFormat(tqdm): + """Provides a `total_time` format parameter""" + @property + def format_dict(self): + d = super(TqdmExtraFormat, self).format_dict + total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1) + d.update(total_time=self.format_interval(total_time) + " in total") + return d + + with closing(StringIO()) as our_file: + for i in TqdmExtraFormat( + range(10), file=our_file, + bar_format="{total_time}: {percentage:.0f}%|{bar}{r_bar}"): + pass + assert "00:00 in total" in our_file.getvalue() + + +@with_setup(pretest, posttest) +def test_unpause(): + """Test unpause""" + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + t = trange(10, file=our_file, leave=True, mininterval=0) + cpu_timify(t, timer) + timer.sleep(0.01) + t.update() + timer.sleep(0.01) + t.update() + timer.sleep(0.1) # longer wait time + t.unpause() + timer.sleep(0.01) + t.update() + timer.sleep(0.01) + t.update() + t.close() + r_before = progressbar_rate(get_bar(our_file.getvalue(), 2)) + r_after = progressbar_rate(get_bar(our_file.getvalue(), 3)) + assert r_before == r_after + + +@with_setup(pretest, posttest) +def test_reset(): + """Test resetting a bar for re-use""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, + miniters=1, mininterval=0, maxinterval=0) as t: + t.update(9) + t.reset() + t.update() + t.reset(total=12) + t.update(10) + assert '| 1/10' in our_file.getvalue() + assert '| 10/12' in our_file.getvalue() + + +@with_setup(pretest, posttest) +def test_position(): + """Test positioned progress bars""" + if nt_and_no_colorama: + raise SkipTest + + # Artificially test nested loop printing + # Without leave + our_file = StringIO() + kwargs = dict(file=our_file, miniters=1, mininterval=0, maxinterval=0) + t = tqdm(total=2, desc='pos2 bar', leave=False, position=2, **kwargs) + t.update() + t.close() + out = our_file.getvalue() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\r '] + + pos_line_diff(res, exres) + + # Test iteration-based tqdm positioning + our_file = StringIO() + kwargs["file"] = our_file + for _ in trange(2, desc='pos0 bar', position=0, **kwargs): + for _ in trange(2, desc='pos1 bar', position=1, **kwargs): + for _ in trange(2, desc='pos2 bar', position=2, **kwargs): + pass + out = our_file.getvalue() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 50%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 100%', + '\rpos1 bar: 100%', + '\n\rpos0 bar: 50%', + '\n\rpos1 bar: 0%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 50%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 100%', + '\rpos1 bar: 100%', + '\n\rpos0 bar: 100%', + '\rpos0 bar: 100%', + '\n'] + pos_line_diff(res, exres) + + # Test manual tqdm positioning + our_file = StringIO() + kwargs["file"] = our_file + kwargs["total"] = 2 + t1 = tqdm(desc='pos0 bar', position=0, **kwargs) + t2 = tqdm(desc='pos1 bar', position=1, **kwargs) + t3 = tqdm(desc='pos2 bar', position=2, **kwargs) + for _ in _range(2): + t1.update() + t3.update() + t2.update() + out = our_file.getvalue() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\n\n\rpos2 bar: 0%', + '\rpos0 bar: 50%', + '\n\n\rpos2 bar: 50%', + '\n\rpos1 bar: 50%', + '\rpos0 bar: 100%', + '\n\n\rpos2 bar: 100%', + '\n\rpos1 bar: 100%'] + pos_line_diff(res, exres) + t1.close() + t2.close() + t3.close() + + # Test auto repositioning of bars when a bar is prematurely closed + # tqdm._instances.clear() # reset number of instances + with closing(StringIO()) as our_file: + t1 = tqdm(total=10, file=our_file, desc='1.pos0 bar', mininterval=0) + t2 = tqdm(total=10, file=our_file, desc='2.pos1 bar', mininterval=0) + t3 = tqdm(total=10, file=our_file, desc='3.pos2 bar', mininterval=0) + res = [m[0] for m in RE_pos.findall(our_file.getvalue())] + exres = ['\r1.pos0 bar: 0%', + '\n\r2.pos1 bar: 0%', + '\n\n\r3.pos2 bar: 0%'] + pos_line_diff(res, exres) + + t2.close() + t4 = tqdm(total=10, file=our_file, desc='4.pos2 bar', mininterval=0) + t1.update(1) + t3.update(1) + t4.update(1) + res = [m[0] for m in RE_pos.findall(our_file.getvalue())] + exres = ['\r1.pos0 bar: 0%', + '\n\r2.pos1 bar: 0%', + '\n\n\r3.pos2 bar: 0%', + '\r2.pos1 bar: 0%', + '\n\n\r4.pos2 bar: 0%', + '\r1.pos0 bar: 10%', + '\n\n\r3.pos2 bar: 10%', + '\n\r4.pos2 bar: 10%'] + pos_line_diff(res, exres) + t4.close() + t3.close() + t1.close() + + +@with_setup(pretest, posttest) +def test_set_description(): + """Test set description""" + with closing(StringIO()) as our_file: + with tqdm(desc='Hello', file=our_file) as t: + assert t.desc == 'Hello' + t.set_description_str('World') + assert t.desc == 'World' + t.set_description() + assert t.desc == '' + t.set_description('Bye') + assert t.desc == 'Bye: ' + assert "World" in our_file.getvalue() + + # without refresh + with closing(StringIO()) as our_file: + with tqdm(desc='Hello', file=our_file) as t: + assert t.desc == 'Hello' + t.set_description_str('World', False) + assert t.desc == 'World' + t.set_description(None, False) + assert t.desc == '' + assert "World" not in our_file.getvalue() + + # unicode + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file) as t: + t.set_description(u"\xe1\xe9\xed\xf3\xfa") + + +@with_setup(pretest, posttest) +def test_deprecated_gui(): + """Test internal GUI properties""" + # Check: StatusPrinter iff gui is disabled + with closing(StringIO()) as our_file: + t = tqdm(total=2, gui=True, file=our_file, miniters=1, mininterval=0) + assert not hasattr(t, "sp") + try: + t.update(1) + except TqdmDeprecationWarning as e: + if ('Please use `tqdm.gui.tqdm(...)` instead of' + ' `tqdm(..., gui=True)`') \ + not in our_file.getvalue(): + raise e + else: + raise DeprecationError('Should not allow manual gui=True without' + ' overriding __iter__() and update()') + finally: + t._instances.clear() + # t.close() + # len(tqdm._instances) += 1 # undo the close() decrement + + t = tqdm(_range(3), gui=True, file=our_file, miniters=1, mininterval=0) + try: + for _ in t: + pass + except TqdmDeprecationWarning as e: + if ('Please use `tqdm.gui.tqdm(...)` instead of' + ' `tqdm(..., gui=True)`') \ + not in our_file.getvalue(): + raise e + else: + raise DeprecationError('Should not allow manual gui=True without' + ' overriding __iter__() and update()') + finally: + t._instances.clear() + # t.close() + # len(tqdm._instances) += 1 # undo the close() decrement + + with tqdm(total=1, gui=False, file=our_file) as t: + assert hasattr(t, "sp") + + +@with_setup(pretest, posttest) +def test_cmp(): + """Test comparison functions""" + with closing(StringIO()) as our_file: + t0 = tqdm(total=10, file=our_file) + t1 = tqdm(total=10, file=our_file) + t2 = tqdm(total=10, file=our_file) + + assert t0 < t1 + assert t2 >= t0 + assert t0 <= t2 + + t3 = tqdm(total=10, file=our_file) + t4 = tqdm(total=10, file=our_file) + t5 = tqdm(total=10, file=our_file) + t5.close() + t6 = tqdm(total=10, file=our_file) + + assert t3 != t4 + assert t3 > t2 + assert t5 == t6 + t6.close() + t4.close() + t3.close() + t2.close() + t1.close() + t0.close() + + +@with_setup(pretest, posttest) +def test_repr(): + """Test representation""" + with closing(StringIO()) as our_file: + with tqdm(total=10, ascii=True, file=our_file) as t: + assert str(t) == ' 0%| | 0/10 [00:00<?, ?it/s]' + + +@with_setup(pretest, posttest) +def test_clear(): + """Test clearing bar display""" + with closing(StringIO()) as our_file: + t1 = tqdm(total=10, file=our_file, desc='pos0 bar', + bar_format='{l_bar}') + t2 = trange(10, file=our_file, desc='pos1 bar', bar_format='{l_bar}') + before = squash_ctrlchars(our_file.getvalue()) + t2.clear() + t1.clear() + after = squash_ctrlchars(our_file.getvalue()) + t1.close() + t2.close() + assert before == ['pos0 bar: 0%|', 'pos1 bar: 0%|'] + assert after == ['', ''] + + +@with_setup(pretest, posttest) +def test_clear_disabled(): + """Test clearing bar display""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, desc='pos0 bar', disable=True, + bar_format='{l_bar}') as t: + t.clear() + assert our_file.getvalue() == '' + + +@with_setup(pretest, posttest) +def test_refresh(): + """Test refresh bar display""" + with closing(StringIO()) as our_file: + t1 = tqdm(total=10, file=our_file, desc='pos0 bar', + bar_format='{l_bar}', mininterval=999, miniters=999) + t2 = tqdm(total=10, file=our_file, desc='pos1 bar', + bar_format='{l_bar}', mininterval=999, miniters=999) + t1.update() + t2.update() + before = squash_ctrlchars(our_file.getvalue()) + t1.refresh() + t2.refresh() + after = squash_ctrlchars(our_file.getvalue()) + t1.close() + t2.close() + + # Check that refreshing indeed forced the display to use realtime state + assert before == [u'pos0 bar: 0%|', u'pos1 bar: 0%|'] + assert after == [u'pos0 bar: 10%|', u'pos1 bar: 10%|'] + + +@with_setup(pretest, posttest) +def test_disabled_refresh(): + """Test refresh bar display""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, desc='pos0 bar', disable=True, + bar_format='{l_bar}', mininterval=999, miniters=999) as t: + t.update() + t.refresh() + + assert our_file.getvalue() == '' + + +@with_setup(pretest, posttest) +def test_write(): + """Test write messages""" + s = "Hello world" + with closing(StringIO()) as our_file: + # Change format to keep only left part w/o bar and it/s rate + t1 = tqdm(total=10, file=our_file, desc='pos0 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + t2 = trange(10, file=our_file, desc='pos1 bar', bar_format='{l_bar}', + mininterval=0, miniters=1) + t3 = tqdm(total=10, file=our_file, desc='pos2 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + t1.update() + t2.update() + t3.update() + before = our_file.getvalue() + + # Write msg and see if bars are correctly redrawn below the msg + t1.write(s, file=our_file) # call as an instance method + tqdm.write(s, file=our_file) # call as a class method + after = our_file.getvalue() + + t1.close() + t2.close() + t3.close() + + before_squashed = squash_ctrlchars(before) + after_squashed = squash_ctrlchars(after) + + assert after_squashed == [s, s] + before_squashed + + # Check that no bar clearing if different file + with closing(StringIO()) as our_file_bar: + with closing(StringIO()) as our_file_write: + t1 = tqdm(total=10, file=our_file_bar, desc='pos0 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + + t1.update() + before_bar = our_file_bar.getvalue() + + tqdm.write(s, file=our_file_write) + + after_bar = our_file_bar.getvalue() + t1.close() + + assert before_bar == after_bar + + # Test stdout/stderr anti-mixup strategy + # Backup stdout/stderr + stde = sys.stderr + stdo = sys.stdout + # Mock stdout/stderr + with closing(StringIO()) as our_stderr: + with closing(StringIO()) as our_stdout: + sys.stderr = our_stderr + sys.stdout = our_stdout + t1 = tqdm(total=10, file=sys.stderr, desc='pos0 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + + t1.update() + before_err = sys.stderr.getvalue() + before_out = sys.stdout.getvalue() + + tqdm.write(s, file=sys.stdout) + after_err = sys.stderr.getvalue() + after_out = sys.stdout.getvalue() + + t1.close() + + assert before_err == '\rpos0 bar: 0%|\rpos0 bar: 10%|' + assert before_out == '' + after_err_res = [m[0] for m in RE_pos.findall(after_err)] + exres = ['\rpos0 bar: 0%|', + '\rpos0 bar: 10%|', + '\r ', + '\r\rpos0 bar: 10%|'] + pos_line_diff(after_err_res, exres) + assert after_out == s + '\n' + # Restore stdout and stderr + sys.stderr = stde + sys.stdout = stdo + + +@with_setup(pretest, posttest) +def test_len(): + """Test advance len (numpy array shape)""" + try: + import numpy as np + except ImportError: + raise SkipTest + with closing(StringIO()) as f: + with tqdm(np.zeros((3, 4)), file=f) as t: + assert len(t) == 3 + + +@with_setup(pretest, posttest) +def test_autodisable_disable(): + """Test autodisable will disable on non-TTY""" + with closing(StringIO()) as our_file: + with tqdm(total=10, disable=None, file=our_file) as t: + t.update(3) + assert our_file.getvalue() == '' + + +@with_setup(pretest, posttest) +def test_autodisable_enable(): + """Test autodisable will not disable on TTY""" + with closing(StringIO()) as our_file: + setattr(our_file, "isatty", lambda: True) + with tqdm(total=10, disable=None, file=our_file) as t: + t.update() + assert our_file.getvalue() != '' + + +@with_setup(pretest, posttest) +def test_deprecation_exception(): + def test_TqdmDeprecationWarning(): + with closing(StringIO()) as our_file: + raise (TqdmDeprecationWarning('Test!', fp_write=getattr( + our_file, 'write', sys.stderr.write))) + + def test_TqdmDeprecationWarning_nofpwrite(): + raise (TqdmDeprecationWarning('Test!', fp_write=None)) + + assert_raises(TqdmDeprecationWarning, test_TqdmDeprecationWarning) + assert_raises(Exception, test_TqdmDeprecationWarning_nofpwrite) + + +@with_setup(pretest, posttest) +def test_postfix(): + """Test postfix""" + postfix = {'float': 0.321034, 'gen': 543, 'str': 'h', 'lst': [2]} + postfix_order = (('w', 'w'), ('a', 0)) # no need for OrderedDict + expected = ['float=0.321', 'gen=543', 'lst=[2]', 'str=h'] + expected_order = ['w=w', 'a=0', 'float=0.321', 'gen=543', 'lst=[2]', + 'str=h'] + + # Test postfix set at init + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, desc='pos0 bar', + bar_format='{r_bar}', postfix=postfix) as t1: + t1.refresh() + out = our_file.getvalue() + + # Test postfix set after init + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos1 bar', bar_format='{r_bar}', + postfix=None) as t2: + t2.set_postfix(**postfix) + t2.refresh() + out2 = our_file.getvalue() + + # Order of items in dict may change, so need a loop to check per item + for res in expected: + assert res in out + assert res in out2 + + # Test postfix (with ordered dict and no refresh) set after init + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', + postfix=None) as t3: + t3.set_postfix(postfix_order, False, **postfix) + t3.refresh() # explicit external refresh + out3 = our_file.getvalue() + + out3 = out3[1:-1].split(', ')[3:] + assert out3 == expected_order + + # Test postfix (with ordered dict and refresh) set after init + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos2 bar', + bar_format='{r_bar}', postfix=None) as t4: + t4.set_postfix(postfix_order, True, **postfix) + t4.refresh() # double refresh + out4 = our_file.getvalue() + + assert out4.count('\r') > out3.count('\r') + assert out4.count(", ".join(expected_order)) == 2 + + # Test setting postfix string directly + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', + postfix=None) as t5: + t5.set_postfix_str("Hello", False) + t5.set_postfix_str("World") + out5 = our_file.getvalue() + + assert "Hello" not in out5 + out5 = out5[1:-1].split(', ')[3:] + assert out5 == ["World"] + + +def test_postfix_direct(): + """Test directly assigning non-str objects to postfix""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, miniters=1, mininterval=0, + bar_format="{postfix[0][name]} {postfix[1]:>5.2f}", + postfix=[dict(name="foo"), 42]) as t: + for i in range(10): + if i % 2: + t.postfix[0]["name"] = "abcdefghij"[i] + else: + t.postfix[1] = i + t.update() + res = our_file.getvalue() + assert "f 6.00" in res + assert "h 6.00" in res + assert "h 8.00" in res + assert "j 8.00" in res + + +@contextmanager +def std_out_err_redirect_tqdm(tqdm_file=sys.stderr): + orig_out_err = sys.stdout, sys.stderr + try: + sys.stdout = sys.stderr = DummyTqdmFile(tqdm_file) + yield orig_out_err[0] + # Relay exceptions + except Exception as exc: + raise exc + # Always restore sys.stdout/err if necessary + finally: + sys.stdout, sys.stderr = orig_out_err + + +@with_setup(pretest, posttest) +def test_file_redirection(): + """Test redirection of output""" + with closing(StringIO()) as our_file: + # Redirect stdout to tqdm.write() + with std_out_err_redirect_tqdm(tqdm_file=our_file): + for _ in trange(3): + print("Such fun") + res = our_file.getvalue() + assert res.count("Such fun\n") == 3 + assert "0/3" in res + assert "3/3" in res + + +@with_setup(pretest, posttest) +def test_external_write(): + """Test external write mode""" + with closing(StringIO()) as our_file: + # Redirect stdout to tqdm.write() + for _ in trange(3, file=our_file): + del tqdm._lock # classmethod should be able to recreate lock + with tqdm.external_write_mode(file=our_file): + our_file.write("Such fun\n") + res = our_file.getvalue() + assert res.count("Such fun\n") == 3 + assert "0/3" in res + assert "3/3" in res + + +@with_setup(pretest, posttest) +def test_unit_scale(): + """Test numeric `unit_scale`""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(9), unit_scale=9, file=our_file, + miniters=1, mininterval=0): + pass + out = our_file.getvalue() + assert '81/81' in out + + +@with_setup(pretest, posttest) +def test_threading(): + """Test multiprocess/thread-realted features""" + from multiprocessing import RLock + try: + mp_lock = RLock() + except OSError: + pass + else: + tqdm.set_lock(mp_lock) + # TODO: test interleaved output #445 + + +@with_setup(pretest, posttest) +def test_bool(): + """Test boolean cast""" + def internal(our_file, disable): + kwargs = dict(file=our_file, disable=disable) + with trange(10, **kwargs) as t: + assert t + with trange(0, **kwargs) as t: + assert not t + with tqdm(total=10, **kwargs) as t: + assert bool(t) + with tqdm(total=0, **kwargs) as t: + assert not bool(t) + with tqdm([], **kwargs) as t: + assert not t + with tqdm([0], **kwargs) as t: + assert t + with tqdm((x for x in []), **kwargs) as t: + assert t + with tqdm((x for x in [1, 2, 3]), **kwargs) as t: + assert t + with tqdm(**kwargs) as t: + try: + print(bool(t)) + except TypeError: + pass + else: + raise TypeError("Expected bool(tqdm()) to fail") + + # test with and without disable + with closing(StringIO()) as our_file: + internal(our_file, False) + internal(our_file, True) + + +def backendCheck(module): + """Test tqdm-like module fallback""" + tn = module.tqdm + tr = module.trange + + with closing(StringIO()) as our_file: + with tn(total=10, file=our_file) as t: + assert len(t) == 10 + with tr(1337) as t: + assert len(t) == 1337 + + +@with_setup(pretest, posttest) +def test_auto(): + """Test auto fallback""" + from tqdm import autonotebook, auto + backendCheck(autonotebook) + backendCheck(auto) + + +@with_setup(pretest, posttest) +def test_wrapattr(): + """Test wrapping file-like objects""" + data = "a twenty-char string" + + with closing(StringIO()) as our_file: + with closing(StringIO()) as writer: + with tqdm.wrapattr( + writer, "write", file=our_file, bytes=True) as wrap: + wrap.write(data) + res = writer.getvalue() + assert data == res + res = our_file.getvalue() + assert ('%.1fB [' % len(data)) in res + + with closing(StringIO()) as our_file: + with closing(StringIO()) as writer: + with tqdm.wrapattr( + writer, "write", file=our_file, bytes=False) as wrap: + wrap.write(data) + res = our_file.getvalue() + assert ('%dit [' % len(data)) in res + + +@with_setup(pretest, posttest) +def test_float_progress(): + """Test float totals""" + with closing(StringIO()) as our_file: + with trange(10, total=9.6, file=our_file) as t: + with catch_warnings(record=True) as w: + simplefilter("always") + for i in t: + if i < 9: + assert not w + assert w + assert "clamping frac" in str(w[-1].message) + + +@with_setup(pretest, posttest) +def test_screen_shape(): + """Test screen shape""" + # ncols + with closing(StringIO()) as our_file: + with trange(10, file=our_file, ncols=50) as t: + list(t) + + res = our_file.getvalue() + assert all(len(i) == 50 for i in get_bar(res)) + + # no second/third bar, leave=False + with closing(StringIO()) as our_file: + kwargs = dict(file=our_file, ncols=50, nrows=2, miniters=0, + mininterval=0, leave=False) + with trange(10, desc="one", **kwargs) as t1: + with trange(10, desc="two", **kwargs) as t2: + with trange(10, desc="three", **kwargs) as t3: + list(t3) + list(t2) + list(t1) + + res = our_file.getvalue() + assert "one" in res + assert "two" not in res + assert "three" not in res + assert "\n\n" not in res + assert "more hidden" in res + # double-check ncols + assert all(len(i) == 50 for i in get_bar(res) + if i.strip() and "more hidden" not in i) + + # all bars, leave=True + with closing(StringIO()) as our_file: + kwargs = dict(file=our_file, ncols=50, nrows=2, miniters=0, + mininterval=0) + with trange(10, desc="one", **kwargs) as t1: + with trange(10, desc="two", **kwargs) as t2: + assert "two" not in our_file.getvalue() + with trange(10, desc="three", **kwargs) as t3: + assert "three" not in our_file.getvalue() + list(t3) + list(t2) + list(t1) + + res = our_file.getvalue() + assert "one" in res + assert "two" in res + assert "three" in res + assert "\n\n" not in res + assert "more hidden" in res + # double-check ncols + assert all(len(i) == 50 for i in get_bar(res) + if i.strip() and "more hidden" not in i) + + # second bar becomes first, leave=False + with closing(StringIO()) as our_file: + kwargs = dict(file=our_file, ncols=50, nrows=2, miniters=0, + mininterval=0, leave=False) + t1 = tqdm(total=10, desc="one", **kwargs) + with tqdm(total=10, desc="two", **kwargs) as t2: + t1.update() + t2.update() + t1.close() + res = our_file.getvalue() + assert "one" in res + assert "two" not in res + assert "more hidden" in res + t2.update() + + res = our_file.getvalue() + assert "two" in res diff --git a/libs/tqdm/tests/tests_version.py b/libs/tqdm/tests/tests_version.py new file mode 100644 index 000000000..226b99802 --- /dev/null +++ b/libs/tqdm/tests/tests_version.py @@ -0,0 +1,12 @@ +import re + + +def test_version(): + """Test version string""" + from tqdm import __version__ + version_parts = re.split('[.-]', __version__) + assert 3 <= len(version_parts) # must have at least Major.minor.patch + try: + map(int, version_parts[:3]) + except ValueError: + raise TypeError('Version Major.minor.patch must be 3 integers') diff --git a/libs/tqdm/tqdm.1 b/libs/tqdm/tqdm.1 new file mode 100644 index 000000000..f0c692452 --- /dev/null +++ b/libs/tqdm/tqdm.1 @@ -0,0 +1,272 @@ +.\" Automatically generated by Pandoc 1.19.2.1 +.\" +.TH "TQDM" "1" "2015\-2020" "tqdm User Manuals" "" +.hy +.SH NAME +.PP +tqdm \- fast, extensible progress bar for Python and CLI +.SH SYNOPSIS +.PP +tqdm [\f[I]options\f[]] +.SH DESCRIPTION +.PP +See <https://github.com/tqdm/tqdm>. +Can be used as a pipe: +.IP +.nf +\f[C] +$\ #\ count\ lines\ of\ code +$\ cat\ *.py\ |\ tqdm\ |\ wc\ \-l +327it\ [00:00,\ 981773.38it/s] +327 + +$\ #\ find\ all\ files +$\ find\ .\ \-name\ "*.py"\ |\ tqdm\ |\ wc\ \-l +432it\ [00:00,\ 833842.30it/s] +432 + +#\ ...\ and\ more\ info +$\ find\ .\ \-name\ \[aq]*.py\[aq]\ \-exec\ wc\ \-l\ \\{}\ \\;\ \\ +\ \ |\ tqdm\ \-\-total\ 432\ \-\-unit\ files\ \-\-desc\ counting\ \\ +\ \ |\ awk\ \[aq]{\ sum\ +=\ $1\ };\ END\ {\ print\ sum\ }\[aq] +counting:\ 100%|█████████|\ 432/432\ [00:00<00:00,\ 794361.83files/s] +131998 +\f[] +.fi +.SH OPTIONS +.TP +.B \-h, \-\-help +Print this help and exit +.RS +.RE +.TP +.B \-v, \-\-version +Print version and exit +.RS +.RE +.TP +.B \-\-desc=\f[I]desc\f[] +str, optional. +Prefix for the progressbar. +.RS +.RE +.TP +.B \-\-total=\f[I]total\f[] +int or float, optional. +The number of expected iterations. +If unspecified, len(iterable) is used if possible. +If float("inf") or as a last resort, only basic progress statistics are +displayed (no ETA, no progressbar). +If \f[C]gui\f[] is True and this parameter needs subsequent updating, +specify an initial arbitrary large positive number, e.g. +9e9. +.RS +.RE +.TP +.B \-\-leave=\f[I]leave\f[] +bool, optional. +If [default: True], keeps all traces of the progressbar upon termination +of iteration. +If \f[C]None\f[], will leave only if \f[C]position\f[] is \f[C]0\f[]. +.RS +.RE +.TP +.B \-\-ncols=\f[I]ncols\f[] +int, optional. +The width of the entire output message. +If specified, dynamically resizes the progressbar to stay within this +bound. +If unspecified, attempts to use environment width. +The fallback is a meter width of 10 and no limit for the counter and +statistics. +If 0, will not print any meter (only stats). +.RS +.RE +.TP +.B \-\-mininterval=\f[I]mininterval\f[] +float, optional. +Minimum progress display update interval [default: 0.1] seconds. +.RS +.RE +.TP +.B \-\-maxinterval=\f[I]maxinterval\f[] +float, optional. +Maximum progress display update interval [default: 10] seconds. +Automatically adjusts \f[C]miniters\f[] to correspond to +\f[C]mininterval\f[] after long display update lag. +Only works if \f[C]dynamic_miniters\f[] or monitor thread is enabled. +.RS +.RE +.TP +.B \-\-miniters=\f[I]miniters\f[] +int or float, optional. +Minimum progress display update interval, in iterations. +If 0 and \f[C]dynamic_miniters\f[], will automatically adjust to equal +\f[C]mininterval\f[] (more CPU efficient, good for tight loops). +If > 0, will skip display of specified number of iterations. +Tweak this and \f[C]mininterval\f[] to get very efficient loops. +If your progress is erratic with both fast and slow iterations (network, +skipping items, etc) you should set miniters=1. +.RS +.RE +.TP +.B \-\-ascii=\f[I]ascii\f[] +bool or str, optional. +If unspecified or False, use unicode (smooth blocks) to fill the meter. +The fallback is to use ASCII characters " 123456789#". +.RS +.RE +.TP +.B \-\-disable=\f[I]disable\f[] +bool, optional. +Whether to disable the entire progressbar wrapper [default: False]. +If set to None, disable on non\-TTY. +.RS +.RE +.TP +.B \-\-unit=\f[I]unit\f[] +str, optional. +String that will be used to define the unit of each iteration [default: +it]. +.RS +.RE +.TP +.B \-\-unit_scale=\f[I]unit_scale\f[] +bool or int or float, optional. +If 1 or True, the number of iterations will be reduced/scaled +automatically and a metric prefix following the International System of +Units standard will be added (kilo, mega, etc.) [default: False]. +If any other non\-zero number, will scale \f[C]total\f[] and \f[C]n\f[]. +.RS +.RE +.TP +.B \-\-dynamic_ncols=\f[I]dynamic_ncols\f[] +bool, optional. +If set, constantly alters \f[C]ncols\f[] and \f[C]nrows\f[] to the +environment (allowing for window resizes) [default: False]. +.RS +.RE +.TP +.B \-\-smoothing=\f[I]smoothing\f[] +float, optional. +Exponential moving average smoothing factor for speed estimates (ignored +in GUI mode). +Ranges from 0 (average speed) to 1 (current/instantaneous speed) +[default: 0.3]. +.RS +.RE +.TP +.B \-\-bar_format=\f[I]bar_format\f[] +str, optional. +Specify a custom bar string formatting. +May impact performance. +[default: \[aq]{l_bar}{bar}{r_bar}\[aq]], where l_bar=\[aq]{desc}: +{percentage:3.0f}%|\[aq] and r_bar=\[aq]| {n_fmt}/{total_fmt} +[{elapsed}<{remaining}, \[aq] \[aq]{rate_fmt}{postfix}]\[aq] Possible +vars: l_bar, bar, r_bar, n, n_fmt, total, total_fmt, percentage, +elapsed, elapsed_s, ncols, nrows, desc, unit, rate, rate_fmt, +rate_noinv, rate_noinv_fmt, rate_inv, rate_inv_fmt, postfix, +unit_divisor, remaining, remaining_s. +Note that a trailing ": " is automatically removed after {desc} if the +latter is empty. +.RS +.RE +.TP +.B \-\-initial=\f[I]initial\f[] +int or float, optional. +The initial counter value. +Useful when restarting a progress bar [default: 0]. +If using float, consider specifying \f[C]{n:.3f}\f[] or similar in +\f[C]bar_format\f[], or specifying \f[C]unit_scale\f[]. +.RS +.RE +.TP +.B \-\-position=\f[I]position\f[] +int, optional. +Specify the line offset to print this bar (starting from 0) Automatic if +unspecified. +Useful to manage multiple bars at once (eg, from threads). +.RS +.RE +.TP +.B \-\-postfix=\f[I]postfix\f[] +dict or *, optional. +Specify additional stats to display at the end of the bar. +Calls \f[C]set_postfix(**postfix)\f[] if possible (dict). +.RS +.RE +.TP +.B \-\-unit_divisor=\f[I]unit_divisor\f[] +float, optional. +[default: 1000], ignored unless \f[C]unit_scale\f[] is True. +.RS +.RE +.TP +.B \-\-write_bytes=\f[I]write_bytes\f[] +bool, optional. +If (default: None) and \f[C]file\f[] is unspecified, bytes will be +written in Python 2. +If \f[C]True\f[] will also write bytes. +In all other cases will default to unicode. +.RS +.RE +.TP +.B \-\-lock_args=\f[I]lock_args\f[] +tuple, optional. +Passed to \f[C]refresh\f[] for intermediate output (initialisation, +iterating, and updating). +.RS +.RE +.TP +.B \-\-nrows=\f[I]nrows\f[] +int, optional. +The screen height. +If specified, hides nested bars outside this bound. +If unspecified, attempts to use environment height. +The fallback is 20. +.RS +.RE +.TP +.B \-\-delim=\f[I]delim\f[] +chr, optional. +Delimiting character [default: \[aq]\\n\[aq]]. +Use \[aq]\\0\[aq] for null. +N.B.: on Windows systems, Python converts \[aq]\\n\[aq] to +\[aq]\\r\\n\[aq]. +.RS +.RE +.TP +.B \-\-buf_size=\f[I]buf_size\f[] +int, optional. +String buffer size in bytes [default: 256] used when \f[C]delim\f[] is +specified. +.RS +.RE +.TP +.B \-\-bytes=\f[I]bytes\f[] +bool, optional. +If true, will count bytes, ignore \f[C]delim\f[], and default +\f[C]unit_scale\f[] to True, \f[C]unit_divisor\f[] to 1024, and +\f[C]unit\f[] to \[aq]B\[aq]. +.RS +.RE +.TP +.B \-\-manpath=\f[I]manpath\f[] +str, optional. +Directory in which to install tqdm man pages. +.RS +.RE +.TP +.B \-\-comppath=\f[I]comppath\f[] +str, optional. +Directory in which to place tqdm completion. +.RS +.RE +.TP +.B \-\-log=\f[I]log\f[] +str, optional. +CRITICAL|FATAL|ERROR|WARN(ING)|[default: \[aq]INFO\[aq]]|DEBUG|NOTSET. +.RS +.RE +.SH AUTHORS +tqdm developers <https://github.com/tqdm>. diff --git a/libs/tqdm/utils.py b/libs/tqdm/utils.py new file mode 100644 index 000000000..b64de297f --- /dev/null +++ b/libs/tqdm/utils.py @@ -0,0 +1,391 @@ +from functools import wraps +import os +from platform import system as _curos +import re +import subprocess +from warnings import warn + +CUR_OS = _curos() +IS_WIN = CUR_OS in ['Windows', 'cli'] +IS_NIX = (not IS_WIN) and any( + CUR_OS.startswith(i) for i in + ['CYGWIN', 'MSYS', 'Linux', 'Darwin', 'SunOS', + 'FreeBSD', 'NetBSD', 'OpenBSD']) +RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") + + +# Py2/3 compat. Empty conditional to avoid coverage +if True: # pragma: no cover + try: + _range = xrange + except NameError: + _range = range + + try: + _unich = unichr + except NameError: + _unich = chr + + try: + _unicode = unicode + except NameError: + _unicode = str + + try: + if IS_WIN: + import colorama + else: + raise ImportError + except ImportError: + colorama = None + else: + try: + colorama.init(strip=False) + except TypeError: + colorama.init() + + try: + from weakref import WeakSet + except ImportError: + WeakSet = set + + try: + _basestring = basestring + except NameError: + _basestring = str + + try: # py>=2.7,>=3.1 + from collections import OrderedDict as _OrderedDict + except ImportError: + try: # older Python versions with backported ordereddict lib + from ordereddict import OrderedDict as _OrderedDict + except ImportError: # older Python versions without ordereddict lib + # Py2.6,3.0 compat, from PEP 372 + from collections import MutableMapping + + class _OrderedDict(dict, MutableMapping): + # Methods with direct access to underlying attributes + def __init__(self, *args, **kwds): + if len(args) > 1: + raise TypeError('expected at 1 argument, got %d', + len(args)) + if not hasattr(self, '_keys'): + self._keys = [] + self.update(*args, **kwds) + + def clear(self): + del self._keys[:] + dict.clear(self) + + def __setitem__(self, key, value): + if key not in self: + self._keys.append(key) + dict.__setitem__(self, key, value) + + def __delitem__(self, key): + dict.__delitem__(self, key) + self._keys.remove(key) + + def __iter__(self): + return iter(self._keys) + + def __reversed__(self): + return reversed(self._keys) + + def popitem(self): + if not self: + raise KeyError + key = self._keys.pop() + value = dict.pop(self, key) + return key, value + + def __reduce__(self): + items = [[k, self[k]] for k in self] + inst_dict = vars(self).copy() + inst_dict.pop('_keys', None) + return self.__class__, (items,), inst_dict + + # Methods with indirect access via the above methods + setdefault = MutableMapping.setdefault + update = MutableMapping.update + pop = MutableMapping.pop + keys = MutableMapping.keys + values = MutableMapping.values + items = MutableMapping.items + + def __repr__(self): + pairs = ', '.join(map('%r: %r'.__mod__, self.items())) + return '%s({%s})' % (self.__class__.__name__, pairs) + + def copy(self): + return self.__class__(self) + + @classmethod + def fromkeys(cls, iterable, value=None): + d = cls() + for key in iterable: + d[key] = value + return d + + +class FormatReplace(object): + """ + >>> a = FormatReplace('something') + >>> "{:5d}".format(a) + 'something' + """ + def __init__(self, replace=''): + self.replace = replace + self.format_called = 0 + + def __format__(self, _): + self.format_called += 1 + return self.replace + + +class Comparable(object): + """Assumes child has self._comparable attr/@property""" + def __lt__(self, other): + return self._comparable < other._comparable + + def __le__(self, other): + return (self < other) or (self == other) + + def __eq__(self, other): + return self._comparable == other._comparable + + def __ne__(self, other): + return not self == other + + def __gt__(self, other): + return not self <= other + + def __ge__(self, other): + return not self < other + + +class ObjectWrapper(object): + def __getattr__(self, name): + return getattr(self._wrapped, name) + + def __setattr__(self, name, value): + return setattr(self._wrapped, name, value) + + def wrapper_getattr(self, name): + """Actual `self.getattr` rather than self._wrapped.getattr""" + try: + return object.__getattr__(self, name) + except AttributeError: # py2 + return getattr(self, name) + + def wrapper_setattr(self, name, value): + """Actual `self.setattr` rather than self._wrapped.setattr""" + return object.__setattr__(self, name, value) + + def __init__(self, wrapped): + """ + Thin wrapper around a given object + """ + self.wrapper_setattr('_wrapped', wrapped) + + +class SimpleTextIOWrapper(ObjectWrapper): + """ + Change only `.write()` of the wrapped object by encoding the passed + value and passing the result to the wrapped object's `.write()` method. + """ + # pylint: disable=too-few-public-methods + def __init__(self, wrapped, encoding): + super(SimpleTextIOWrapper, self).__init__(wrapped) + self.wrapper_setattr('encoding', encoding) + + def write(self, s): + """ + Encode `s` and pass to the wrapped object's `.write()` method. + """ + return self._wrapped.write(s.encode(self.wrapper_getattr('encoding'))) + + def __eq__(self, other): + return self._wrapped == getattr(other, '_wrapped', other) + + +class CallbackIOWrapper(ObjectWrapper): + def __init__(self, callback, stream, method="read"): + """ + Wrap a given `file`-like object's `read()` or `write()` to report + lengths to the given `callback` + """ + super(CallbackIOWrapper, self).__init__(stream) + func = getattr(stream, method) + if method == "write": + @wraps(func) + def write(data, *args, **kwargs): + res = func(data, *args, **kwargs) + callback(len(data)) + return res + self.wrapper_setattr('write', write) + elif method == "read": + @wraps(func) + def read(*args, **kwargs): + data = func(*args, **kwargs) + callback(len(data)) + return data + self.wrapper_setattr('read', read) + else: + raise KeyError("Can only wrap read/write methods") + + +def _is_utf(encoding): + try: + u'\u2588\u2589'.encode(encoding) + except UnicodeEncodeError: # pragma: no cover + return False + except Exception: # pragma: no cover + try: + return encoding.lower().startswith('utf-') or ('U8' == encoding) + except: + return False + else: + return True + + +def _supports_unicode(fp): + try: + return _is_utf(fp.encoding) + except AttributeError: + return False + + +def _is_ascii(s): + if isinstance(s, str): + for c in s: + if ord(c) > 255: + return False + return True + return _supports_unicode(s) + + +def _screen_shape_wrapper(): # pragma: no cover + """ + Return a function which returns console dimensions (width, height). + Supported: linux, osx, windows, cygwin. + """ + _screen_shape = None + if IS_WIN: + _screen_shape = _screen_shape_windows + if _screen_shape is None: + _screen_shape = _screen_shape_tput + if IS_NIX: + _screen_shape = _screen_shape_linux + return _screen_shape + + +def _screen_shape_windows(fp): # pragma: no cover + try: + from ctypes import windll, create_string_buffer + import struct + from sys import stdin, stdout + + io_handle = -12 # assume stderr + if fp == stdin: + io_handle = -10 + elif fp == stdout: + io_handle = -11 + + h = windll.kernel32.GetStdHandle(io_handle) + csbi = create_string_buffer(22) + res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) + if res: + (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom, + _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) + return right - left, bottom - top # +1 + except: + pass + return None, None + + +def _screen_shape_tput(*_): # pragma: no cover + """cygwin xterm (windows)""" + try: + import shlex + return [int(subprocess.check_call(shlex.split('tput ' + i))) - 1 + for i in ('cols', 'lines')] + except: + pass + return None, None + + +def _screen_shape_linux(fp): # pragma: no cover + + try: + from termios import TIOCGWINSZ + from fcntl import ioctl + from array import array + except ImportError: + return None + else: + try: + rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2] + return cols, rows + except: + try: + return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")] + except KeyError: + return None, None + + +def _environ_cols_wrapper(): # pragma: no cover + """ + Return a function which returns console width. + Supported: linux, osx, windows, cygwin. + """ + warn("Use `_screen_shape_wrapper()(file)[0]` instead of" + " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2) + shape = _screen_shape_wrapper() + if not shape: + return None + + @wraps(shape) + def inner(fp): + return shape(fp)[0] + + return inner + + +def _term_move_up(): # pragma: no cover + return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' + + +try: + # TODO consider using wcswidth third-party package for 0-width characters + from unicodedata import east_asian_width +except ImportError: + _text_width = len +else: + def _text_width(s): + return sum( + 2 if east_asian_width(ch) in 'FW' else 1 for ch in _unicode(s)) + + +def disp_len(data): + """ + Returns the real on-screen length of a string which may contain + ANSI control codes and wide chars. + """ + return _text_width(RE_ANSI.sub('', data)) + + +def disp_trim(data, length): + """ + Trim a string which may contain ANSI control characters. + """ + if len(data) == disp_len(data): + return data[:length] + + ansi_present = bool(RE_ANSI.search(data)) + while disp_len(data) > length: # carefully delete one char at a time + data = data[:-1] + if ansi_present and bool(RE_ANSI.search(data)): + # assume ANSI reset is required + return data if data.endswith("\033[0m") else data + "\033[0m" + return data |