diff options
Diffstat (limited to 'libs/tqdm/utils.py')
-rw-r--r-- | libs/tqdm/utils.py | 227 |
1 files changed, 95 insertions, 132 deletions
diff --git a/libs/tqdm/utils.py b/libs/tqdm/utils.py index b64de297f..0632b8dd0 100644 --- a/libs/tqdm/utils.py +++ b/libs/tqdm/utils.py @@ -1,131 +1,51 @@ -from functools import wraps +""" +General helpers required for `tqdm.std`. +""" import os -from platform import system as _curos import re -import subprocess +import sys +from functools import wraps from warnings import warn +from weakref import proxy -CUR_OS = _curos() -IS_WIN = CUR_OS in ['Windows', 'cli'] -IS_NIX = (not IS_WIN) and any( - CUR_OS.startswith(i) for i in - ['CYGWIN', 'MSYS', 'Linux', 'Darwin', 'SunOS', - 'FreeBSD', 'NetBSD', 'OpenBSD']) -RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") +# py2/3 compat +try: + _range = xrange +except NameError: + _range = range +try: + _unich = unichr +except NameError: + _unich = chr -# Py2/3 compat. Empty conditional to avoid coverage -if True: # pragma: no cover - try: - _range = xrange - except NameError: - _range = range +try: + _unicode = unicode +except NameError: + _unicode = str - try: - _unich = unichr - except NameError: - _unich = chr +try: + _basestring = basestring +except NameError: + _basestring = str - try: - _unicode = unicode - except NameError: - _unicode = str +CUR_OS = sys.platform +IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin']) +IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin']) +RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") - try: - if IS_WIN: - import colorama - else: - raise ImportError - except ImportError: - colorama = None +try: + if IS_WIN: + import colorama else: - try: - colorama.init(strip=False) - except TypeError: - colorama.init() - - try: - from weakref import WeakSet - except ImportError: - WeakSet = set - + raise ImportError +except ImportError: + colorama = None +else: try: - _basestring = basestring - except NameError: - _basestring = str - - try: # py>=2.7,>=3.1 - from collections import OrderedDict as _OrderedDict - except ImportError: - try: # older Python versions with backported ordereddict lib - from ordereddict import OrderedDict as _OrderedDict - except ImportError: # older Python versions without ordereddict lib - # Py2.6,3.0 compat, from PEP 372 - from collections import MutableMapping - - class _OrderedDict(dict, MutableMapping): - # Methods with direct access to underlying attributes - def __init__(self, *args, **kwds): - if len(args) > 1: - raise TypeError('expected at 1 argument, got %d', - len(args)) - if not hasattr(self, '_keys'): - self._keys = [] - self.update(*args, **kwds) - - def clear(self): - del self._keys[:] - dict.clear(self) - - def __setitem__(self, key, value): - if key not in self: - self._keys.append(key) - dict.__setitem__(self, key, value) - - def __delitem__(self, key): - dict.__delitem__(self, key) - self._keys.remove(key) - - def __iter__(self): - return iter(self._keys) - - def __reversed__(self): - return reversed(self._keys) - - def popitem(self): - if not self: - raise KeyError - key = self._keys.pop() - value = dict.pop(self, key) - return key, value - - def __reduce__(self): - items = [[k, self[k]] for k in self] - inst_dict = vars(self).copy() - inst_dict.pop('_keys', None) - return self.__class__, (items,), inst_dict - - # Methods with indirect access via the above methods - setdefault = MutableMapping.setdefault - update = MutableMapping.update - pop = MutableMapping.pop - keys = MutableMapping.keys - values = MutableMapping.values - items = MutableMapping.items - - def __repr__(self): - pairs = ', '.join(map('%r: %r'.__mod__, self.items())) - return '%s({%s})' % (self.__class__.__name__, pairs) - - def copy(self): - return self.__class__(self) - - @classmethod - def fromkeys(cls, iterable, value=None): - d = cls() - for key in iterable: - d[key] = value - return d + colorama.init(strip=False) + except TypeError: + colorama.init() class FormatReplace(object): @@ -133,7 +53,7 @@ class FormatReplace(object): >>> a = FormatReplace('something') >>> "{:5d}".format(a) 'something' - """ + """ # NOQA: P102 def __init__(self, replace=''): self.replace = replace self.format_called = 0 @@ -209,6 +129,49 @@ class SimpleTextIOWrapper(ObjectWrapper): return self._wrapped == getattr(other, '_wrapped', other) +class DisableOnWriteError(ObjectWrapper): + """ + Disable the given `tqdm_instance` upon `write()` or `flush()` errors. + """ + @staticmethod + def disable_on_exception(tqdm_instance, func): + """ + Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`. + """ + tqdm_instance = proxy(tqdm_instance) + + def inner(*args, **kwargs): + try: + return func(*args, **kwargs) + except OSError as e: + if e.errno != 5: + raise + try: + tqdm_instance.miniters = float('inf') + except ReferenceError: + pass + except ValueError as e: + if 'closed' not in str(e): + raise + try: + tqdm_instance.miniters = float('inf') + except ReferenceError: + pass + return inner + + def __init__(self, wrapped, tqdm_instance): + super(DisableOnWriteError, self).__init__(wrapped) + if hasattr(wrapped, 'write'): + self.wrapper_setattr( + 'write', self.disable_on_exception(tqdm_instance, wrapped.write)) + if hasattr(wrapped, 'flush'): + self.wrapper_setattr( + 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush)) + + def __eq__(self, other): + return self._wrapped == getattr(other, '_wrapped', other) + + class CallbackIOWrapper(ObjectWrapper): def __init__(self, callback, stream, method="read"): """ @@ -238,12 +201,12 @@ class CallbackIOWrapper(ObjectWrapper): def _is_utf(encoding): try: u'\u2588\u2589'.encode(encoding) - except UnicodeEncodeError: # pragma: no cover + except UnicodeEncodeError: return False - except Exception: # pragma: no cover + except Exception: try: return encoding.lower().startswith('utf-') or ('U8' == encoding) - except: + except Exception: return False else: return True @@ -282,8 +245,8 @@ def _screen_shape_wrapper(): # pragma: no cover def _screen_shape_windows(fp): # pragma: no cover try: - from ctypes import windll, create_string_buffer import struct + from ctypes import create_string_buffer, windll from sys import stdin, stdout io_handle = -12 # assume stderr @@ -299,7 +262,7 @@ def _screen_shape_windows(fp): # pragma: no cover (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom, _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) return right - left, bottom - top # +1 - except: + except Exception: # nosec pass return None, None @@ -308,9 +271,10 @@ def _screen_shape_tput(*_): # pragma: no cover """cygwin xterm (windows)""" try: import shlex - return [int(subprocess.check_call(shlex.split('tput ' + i))) - 1 + from subprocess import check_call # nosec + return [int(check_call(shlex.split('tput ' + i))) - 1 for i in ('cols', 'lines')] - except: + except Exception: # nosec pass return None, None @@ -318,19 +282,19 @@ def _screen_shape_tput(*_): # pragma: no cover def _screen_shape_linux(fp): # pragma: no cover try: - from termios import TIOCGWINSZ - from fcntl import ioctl from array import array + from fcntl import ioctl + from termios import TIOCGWINSZ except ImportError: - return None + return None, None else: try: rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2] return cols, rows - except: + except Exception: try: return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")] - except KeyError: + except (KeyError, ValueError): return None, None @@ -363,8 +327,7 @@ except ImportError: _text_width = len else: def _text_width(s): - return sum( - 2 if east_asian_width(ch) in 'FW' else 1 for ch in _unicode(s)) + return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in _unicode(s)) def disp_len(data): |