summaryrefslogtreecommitdiffhomepage
path: root/libs/tqdm/tests/tests_synchronisation.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/tqdm/tests/tests_synchronisation.py')
-rw-r--r--libs/tqdm/tests/tests_synchronisation.py213
1 files changed, 0 insertions, 213 deletions
diff --git a/libs/tqdm/tests/tests_synchronisation.py b/libs/tqdm/tests/tests_synchronisation.py
deleted file mode 100644
index 34f682a58..000000000
--- a/libs/tqdm/tests/tests_synchronisation.py
+++ /dev/null
@@ -1,213 +0,0 @@
-from __future__ import division
-from tqdm import tqdm, trange, TMonitor
-from tests_tqdm import with_setup, pretest, posttest, SkipTest, \
- StringIO, closing
-from tests_tqdm import DiscreteTimer, cpu_timify
-from tests_perf import retry_on_except
-
-import sys
-from time import sleep
-from threading import Event
-
-
-class FakeSleep(object):
- """Wait until the discrete timer reached the required time"""
- def __init__(self, dtimer):
- self.dtimer = dtimer
-
- def sleep(self, t):
- end = t + self.dtimer.t
- while self.dtimer.t < end:
- sleep(0.0000001) # sleep a bit to interrupt (instead of pass)
-
-
-class FakeTqdm(object):
- _instances = []
-
-
-def make_create_fake_sleep_event(sleep):
- def wait(self, timeout=None):
- if timeout is not None:
- sleep(timeout)
- return self.is_set()
-
- def create_fake_sleep_event():
- event = Event()
- event.wait = wait
- return event
-
- return create_fake_sleep_event
-
-
-def incr(x):
- return x + 1
-
-
-def incr_bar(x):
- with closing(StringIO()) as our_file:
- for _ in trange(x, lock_args=(False,), file=our_file):
- pass
- return incr(x)
-
-
-@with_setup(pretest, posttest)
-def test_monitor_thread():
- """Test dummy monitoring thread"""
- maxinterval = 10
-
- # Setup a discrete timer
- timer = DiscreteTimer()
- TMonitor._time = timer.time
- # And a fake sleeper
- sleeper = FakeSleep(timer)
- TMonitor._event = make_create_fake_sleep_event(sleeper.sleep)
-
- # Instanciate the monitor
- monitor = TMonitor(FakeTqdm, maxinterval)
- # Test if alive, then killed
- assert monitor.report()
- monitor.exit()
- timer.sleep(maxinterval * 2) # need to go out of the sleep to die
- assert not monitor.report()
- # assert not monitor.is_alive() # not working dunno why, thread not killed
- del monitor
-
-
-@with_setup(pretest, posttest)
-def test_monitoring_and_cleanup():
- """Test for stalled tqdm instance and monitor deletion"""
- # Note: should fix miniters for these tests, else with dynamic_miniters
- # it's too complicated to handle with monitoring update and maxinterval...
- maxinterval = 2
-
- total = 1000
- # Setup a discrete timer
- timer = DiscreteTimer()
- # And a fake sleeper
- sleeper = FakeSleep(timer)
- # Setup TMonitor to use the timer
- TMonitor._time = timer.time
- TMonitor._event = make_create_fake_sleep_event(sleeper.sleep)
- # Set monitor interval
- tqdm.monitor_interval = maxinterval
- with closing(StringIO()) as our_file:
- with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
- maxinterval=maxinterval) as t:
- cpu_timify(t, timer)
- # Do a lot of iterations in a small timeframe
- # (smaller than monitor interval)
- timer.sleep(maxinterval / 2) # monitor won't wake up
- t.update(500)
- # check that our fixed miniters is still there
- assert t.miniters == 500
- # Then do 1 it after monitor interval, so that monitor kicks in
- timer.sleep(maxinterval * 2)
- t.update(1)
- # Wait for the monitor to get out of sleep's loop and update tqdm..
- timeend = timer.time()
- while not (t.monitor.woken >= timeend and t.miniters == 1):
- timer.sleep(1) # Force monitor to wake up if it woken too soon
- sleep(0.000001) # sleep to allow interrupt (instead of pass)
- assert t.miniters == 1 # check that monitor corrected miniters
- # Note: at this point, there may be a race condition: monitor saved
- # current woken time but timer.sleep() happen just before monitor
- # sleep. To fix that, either sleep here or increase time in a loop
- # to ensure that monitor wakes up at some point.
-
- # Try again but already at miniters = 1 so nothing will be done
- timer.sleep(maxinterval * 2)
- t.update(2)
- timeend = timer.time()
- while t.monitor.woken < timeend:
- timer.sleep(1) # Force monitor to wake up if it woken too soon
- sleep(0.000001)
- # Wait for the monitor to get out of sleep's loop and update tqdm..
- assert t.miniters == 1 # check that monitor corrected miniters
-
- # Check that class var monitor is deleted if no instance left
- tqdm.monitor_interval = 10
- assert tqdm.monitor is None
-
-
-@with_setup(pretest, posttest)
-def test_monitoring_multi():
- """Test on multiple bars, one not needing miniters adjustment"""
- # Note: should fix miniters for these tests, else with dynamic_miniters
- # it's too complicated to handle with monitoring update and maxinterval...
- maxinterval = 2
-
- total = 1000
- # Setup a discrete timer
- timer = DiscreteTimer()
- # And a fake sleeper
- sleeper = FakeSleep(timer)
- # Setup TMonitor to use the timer
- TMonitor._time = timer.time
- TMonitor._event = make_create_fake_sleep_event(sleeper.sleep)
- # Set monitor interval
- tqdm.monitor_interval = maxinterval
- with closing(StringIO()) as our_file:
- with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
- maxinterval=maxinterval) as t1:
- # Set high maxinterval for t2 so monitor does not need to adjust it
- with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
- maxinterval=1E5) as t2:
- cpu_timify(t1, timer)
- cpu_timify(t2, timer)
- # Do a lot of iterations in a small timeframe
- timer.sleep(maxinterval / 2)
- t1.update(500)
- t2.update(500)
- assert t1.miniters == 500
- assert t2.miniters == 500
- # Then do 1 it after monitor interval, so that monitor kicks in
- timer.sleep(maxinterval * 2)
- t1.update(1)
- t2.update(1)
- # Wait for the monitor to get out of sleep and update tqdm
- timeend = timer.time()
- while not (t1.monitor.woken >= timeend and t1.miniters == 1):
- timer.sleep(1)
- sleep(0.000001)
- assert t1.miniters == 1 # check that monitor corrected miniters
- assert t2.miniters == 500 # check that t2 was not adjusted
-
- # Check that class var monitor is deleted if no instance left
- tqdm.monitor_interval = 10
- assert tqdm.monitor is None
-
-
-@with_setup(pretest, posttest)
-def test_imap():
- """Test multiprocessing.Pool"""
- try:
- from multiprocessing import Pool
- except ImportError:
- raise SkipTest
-
- pool = Pool()
- res = list(tqdm(pool.imap(incr, range(100)), disable=True))
- assert res[-1] == 100
-
-
-# py2: locks won't propagate to incr_bar so may cause `AttributeError`
-@retry_on_except(n=3 if sys.version_info < (3,) else 1)
-@with_setup(pretest, posttest)
-def test_threadpool():
- """Test concurrent.futures.ThreadPoolExecutor"""
- try:
- from concurrent.futures import ThreadPoolExecutor
- from threading import RLock
- except ImportError:
- raise SkipTest
-
- tqdm.set_lock(RLock())
- with ThreadPoolExecutor(8) as pool:
- try:
- res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
- except AttributeError:
- if sys.version_info < (3,):
- raise SkipTest
- else:
- raise
- assert sum(res) == sum(range(1, 101))