summaryrefslogtreecommitdiffhomepage
path: root/libs/tqdm/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/tqdm/utils.py')
-rw-r--r--libs/tqdm/utils.py227
1 files changed, 95 insertions, 132 deletions
diff --git a/libs/tqdm/utils.py b/libs/tqdm/utils.py
index b64de297f..0632b8dd0 100644
--- a/libs/tqdm/utils.py
+++ b/libs/tqdm/utils.py
@@ -1,131 +1,51 @@
-from functools import wraps
+"""
+General helpers required for `tqdm.std`.
+"""
import os
-from platform import system as _curos
import re
-import subprocess
+import sys
+from functools import wraps
from warnings import warn
+from weakref import proxy
-CUR_OS = _curos()
-IS_WIN = CUR_OS in ['Windows', 'cli']
-IS_NIX = (not IS_WIN) and any(
- CUR_OS.startswith(i) for i in
- ['CYGWIN', 'MSYS', 'Linux', 'Darwin', 'SunOS',
- 'FreeBSD', 'NetBSD', 'OpenBSD'])
-RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
+# py2/3 compat
+try:
+ _range = xrange
+except NameError:
+ _range = range
+try:
+ _unich = unichr
+except NameError:
+ _unich = chr
-# Py2/3 compat. Empty conditional to avoid coverage
-if True: # pragma: no cover
- try:
- _range = xrange
- except NameError:
- _range = range
+try:
+ _unicode = unicode
+except NameError:
+ _unicode = str
- try:
- _unich = unichr
- except NameError:
- _unich = chr
+try:
+ _basestring = basestring
+except NameError:
+ _basestring = str
- try:
- _unicode = unicode
- except NameError:
- _unicode = str
+CUR_OS = sys.platform
+IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin'])
+IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin'])
+RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
- try:
- if IS_WIN:
- import colorama
- else:
- raise ImportError
- except ImportError:
- colorama = None
+try:
+ if IS_WIN:
+ import colorama
else:
- try:
- colorama.init(strip=False)
- except TypeError:
- colorama.init()
-
- try:
- from weakref import WeakSet
- except ImportError:
- WeakSet = set
-
+ raise ImportError
+except ImportError:
+ colorama = None
+else:
try:
- _basestring = basestring
- except NameError:
- _basestring = str
-
- try: # py>=2.7,>=3.1
- from collections import OrderedDict as _OrderedDict
- except ImportError:
- try: # older Python versions with backported ordereddict lib
- from ordereddict import OrderedDict as _OrderedDict
- except ImportError: # older Python versions without ordereddict lib
- # Py2.6,3.0 compat, from PEP 372
- from collections import MutableMapping
-
- class _OrderedDict(dict, MutableMapping):
- # Methods with direct access to underlying attributes
- def __init__(self, *args, **kwds):
- if len(args) > 1:
- raise TypeError('expected at 1 argument, got %d',
- len(args))
- if not hasattr(self, '_keys'):
- self._keys = []
- self.update(*args, **kwds)
-
- def clear(self):
- del self._keys[:]
- dict.clear(self)
-
- def __setitem__(self, key, value):
- if key not in self:
- self._keys.append(key)
- dict.__setitem__(self, key, value)
-
- def __delitem__(self, key):
- dict.__delitem__(self, key)
- self._keys.remove(key)
-
- def __iter__(self):
- return iter(self._keys)
-
- def __reversed__(self):
- return reversed(self._keys)
-
- def popitem(self):
- if not self:
- raise KeyError
- key = self._keys.pop()
- value = dict.pop(self, key)
- return key, value
-
- def __reduce__(self):
- items = [[k, self[k]] for k in self]
- inst_dict = vars(self).copy()
- inst_dict.pop('_keys', None)
- return self.__class__, (items,), inst_dict
-
- # Methods with indirect access via the above methods
- setdefault = MutableMapping.setdefault
- update = MutableMapping.update
- pop = MutableMapping.pop
- keys = MutableMapping.keys
- values = MutableMapping.values
- items = MutableMapping.items
-
- def __repr__(self):
- pairs = ', '.join(map('%r: %r'.__mod__, self.items()))
- return '%s({%s})' % (self.__class__.__name__, pairs)
-
- def copy(self):
- return self.__class__(self)
-
- @classmethod
- def fromkeys(cls, iterable, value=None):
- d = cls()
- for key in iterable:
- d[key] = value
- return d
+ colorama.init(strip=False)
+ except TypeError:
+ colorama.init()
class FormatReplace(object):
@@ -133,7 +53,7 @@ class FormatReplace(object):
>>> a = FormatReplace('something')
>>> "{:5d}".format(a)
'something'
- """
+ """ # NOQA: P102
def __init__(self, replace=''):
self.replace = replace
self.format_called = 0
@@ -209,6 +129,49 @@ class SimpleTextIOWrapper(ObjectWrapper):
return self._wrapped == getattr(other, '_wrapped', other)
+class DisableOnWriteError(ObjectWrapper):
+ """
+ Disable the given `tqdm_instance` upon `write()` or `flush()` errors.
+ """
+ @staticmethod
+ def disable_on_exception(tqdm_instance, func):
+ """
+ Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`.
+ """
+ tqdm_instance = proxy(tqdm_instance)
+
+ def inner(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except OSError as e:
+ if e.errno != 5:
+ raise
+ try:
+ tqdm_instance.miniters = float('inf')
+ except ReferenceError:
+ pass
+ except ValueError as e:
+ if 'closed' not in str(e):
+ raise
+ try:
+ tqdm_instance.miniters = float('inf')
+ except ReferenceError:
+ pass
+ return inner
+
+ def __init__(self, wrapped, tqdm_instance):
+ super(DisableOnWriteError, self).__init__(wrapped)
+ if hasattr(wrapped, 'write'):
+ self.wrapper_setattr(
+ 'write', self.disable_on_exception(tqdm_instance, wrapped.write))
+ if hasattr(wrapped, 'flush'):
+ self.wrapper_setattr(
+ 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush))
+
+ def __eq__(self, other):
+ return self._wrapped == getattr(other, '_wrapped', other)
+
+
class CallbackIOWrapper(ObjectWrapper):
def __init__(self, callback, stream, method="read"):
"""
@@ -238,12 +201,12 @@ class CallbackIOWrapper(ObjectWrapper):
def _is_utf(encoding):
try:
u'\u2588\u2589'.encode(encoding)
- except UnicodeEncodeError: # pragma: no cover
+ except UnicodeEncodeError:
return False
- except Exception: # pragma: no cover
+ except Exception:
try:
return encoding.lower().startswith('utf-') or ('U8' == encoding)
- except:
+ except Exception:
return False
else:
return True
@@ -282,8 +245,8 @@ def _screen_shape_wrapper(): # pragma: no cover
def _screen_shape_windows(fp): # pragma: no cover
try:
- from ctypes import windll, create_string_buffer
import struct
+ from ctypes import create_string_buffer, windll
from sys import stdin, stdout
io_handle = -12 # assume stderr
@@ -299,7 +262,7 @@ def _screen_shape_windows(fp): # pragma: no cover
(_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom,
_maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
return right - left, bottom - top # +1
- except:
+ except Exception: # nosec
pass
return None, None
@@ -308,9 +271,10 @@ def _screen_shape_tput(*_): # pragma: no cover
"""cygwin xterm (windows)"""
try:
import shlex
- return [int(subprocess.check_call(shlex.split('tput ' + i))) - 1
+ from subprocess import check_call # nosec
+ return [int(check_call(shlex.split('tput ' + i))) - 1
for i in ('cols', 'lines')]
- except:
+ except Exception: # nosec
pass
return None, None
@@ -318,19 +282,19 @@ def _screen_shape_tput(*_): # pragma: no cover
def _screen_shape_linux(fp): # pragma: no cover
try:
- from termios import TIOCGWINSZ
- from fcntl import ioctl
from array import array
+ from fcntl import ioctl
+ from termios import TIOCGWINSZ
except ImportError:
- return None
+ return None, None
else:
try:
rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2]
return cols, rows
- except:
+ except Exception:
try:
return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")]
- except KeyError:
+ except (KeyError, ValueError):
return None, None
@@ -363,8 +327,7 @@ except ImportError:
_text_width = len
else:
def _text_width(s):
- return sum(
- 2 if east_asian_width(ch) in 'FW' else 1 for ch in _unicode(s))
+ return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in _unicode(s))
def disp_len(data):