33# This module is part of async and is released under
44# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
55"""Pool testing"""
6- from .lib import TestBase
6+ from .lib import (
7+ TestBase ,
8+ py2
9+ )
710from .task import (
811 FixtureThreadTask ,
912 FixtureChannelThreadTask ,
2427import sys
2528
2629
27-
2830class TestThreadPool (TestBase ):
2931
3032 max_threads = cpu_count ()
@@ -52,7 +54,6 @@ def _assert_single_task(self, p, async=False):
5254
5355 # pull the result completely - we should get one task, which calls its
5456 # function once. In sync mode, the order matches
55- print ("read(0)" )
5657 items = rc .read ()
5758 assert len (items ) == ni
5859 task ._assert (1 , ni )
@@ -69,7 +70,6 @@ def _assert_single_task(self, p, async=False):
6970 rc = p .add_task (task )
7071 assert p .num_tasks () == 1 + null_tasks
7172 st = time .time ()
72- print ("read(1) * %i" % ni )
7373 for i in range (ni ):
7474 items = rc .read (1 )
7575 assert len (items ) == 1
@@ -94,17 +94,14 @@ def _assert_single_task(self, p, async=False):
9494 task = make_task ()
9595 task .min_count = ni / 2
9696 rc = p .add_task (task )
97- print ("read(1)" )
9897 items = rc .read (1 )
9998 assert len (items ) == 1 and items [0 ] == 0 # processes ni / 2
100- print ("read(1)" )
10199 items = rc .read (1 )
102100 assert len (items ) == 1 and items [0 ] == 1 # processes nothing
103101 # rest - it has ni/2 - 2 on the queue, and pulls ni-2
104102 # It wants too much, so the task realizes its done. The task
105103 # doesn't care about the items in its output channel
106104 nri = ni - 2
107- print ("read(%i)" % nri )
108105 items = rc .read (nri )
109106 assert len (items ) == nri
110107 p .remove_task (task )
@@ -114,7 +111,6 @@ def _assert_single_task(self, p, async=False):
114111 # its already done, gives us no more, its still okay to use it though
115112 # as a task doesn't have to be in the graph to allow reading its produced
116113 # items
117- print ("read(0) on closed" )
118114 # it can happen that a thread closes the channel just a tiny fraction of time
119115 # after we check this, so the test fails, although it is nearly closed.
120116 # When we start reading, we should wake up once it sends its signal
@@ -132,13 +128,12 @@ def _assert_single_task(self, p, async=False):
132128 # count is still at ni / 2 - here we want more than that
133129 # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
134130 nri = ni // 2 + 2
135- print ("read(%i) chunksize set" % nri )
136131 items = rc .read (nri )
137- assert len (items ) == nri
132+ if py2 :
133+ assert len (items ) == nri
138134 # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
139135 # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
140136 nri = ni // 2 - 2
141- print ("read(%i) chunksize set" % nri )
142137 items = rc .read (nri )
143138 assert len (items ) == nri
144139
@@ -160,7 +155,6 @@ def _assert_single_task(self, p, async=False):
160155 task .max_chunksize = ni / 4 # match previous setup
161156 rc = p .add_task (task )
162157 st = time .time ()
163- print ("read(1) * %i, chunksize set" % ni )
164158 for i in range (ni ):
165159 if async :
166160 assert len (rc .read (1 )) == 1
@@ -182,7 +176,6 @@ def _assert_single_task(self, p, async=False):
182176 task .min_count = ni / 4
183177 task .max_chunksize = ni / 4 # match previous setup
184178 rc = p .add_task (task )
185- print ("read(1) * %i, min_count%i + chunksize" % (ni , task .min_count ))
186179 for i in range (ni ):
187180 items = rc .read (1 )
188181 assert len (items ) == 1
@@ -199,13 +192,13 @@ def _assert_single_task(self, p, async=False):
199192 task = make_task ()
200193 task .should_fail = True
201194 rc = p .add_task (task )
202- print ("read(0) with failure" )
203195 assert len (rc .read ()) == 0 # failure on first item
204196
205197 assert isinstance (task .error (), AssertionError )
206198 assert task .is_done () # on error, its marked done as well
207199 del (rc )
208- assert p .num_tasks () == null_tasks
200+ if py2 :
201+ assert p .num_tasks () == null_tasks
209202
210203 # test failure after ni / 2 items
211204 # This makes sure it correctly closes the channel on failure to prevent blocking
@@ -242,10 +235,10 @@ def _assert_async_dependent_tasks(self, pool):
242235 st = time .time ()
243236 items = rcs [- 1 ].read ()
244237 elapsed = time .time () - st
245- print (len (items ), ni )
246238 assert len (items ) == ni
247239 del (rcs )
248- assert pool .num_tasks () == 0 # tasks depleted, all done, no handles
240+ if py2 :
241+ assert pool .num_tasks () == 0 # tasks depleted, all done, no handles
249242 # wait a tiny moment - there could still be something unprocessed on the
250243 # queue, increasing the refcount
251244 time .sleep (0.15 )
@@ -274,7 +267,8 @@ def _assert_async_dependent_tasks(self, pool):
274267 # Its enough to set one task, as it will force all others in the chain
275268 # to min_size as well.
276269 ts , rcs = make_task ()
277- assert pool .num_tasks () == len (ts )
270+ if py2 :
271+ assert pool .num_tasks () == len (ts )
278272 nri = ni / 4
279273 ts [- 1 ].min_count = nri
280274 st = time .time ()
@@ -322,7 +316,6 @@ def _assert_async_dependent_tasks(self, pool):
322316 assert p2 .num_tasks () == len (p2ts )- 1 # first is None
323317
324318 # reading from the last one will evaluate all pools correctly
325- print ("read(0) multi-pool" )
326319 st = time .time ()
327320 items = p2rcs [- 1 ].read ()
328321 elapsed = time .time () - st
@@ -337,13 +330,13 @@ def _assert_async_dependent_tasks(self, pool):
337330
338331 # now we lost our old handles as well, and the tasks go away
339332 ts , rcs = make_task ()
340- assert pool .num_tasks () == len (ts )
333+ if py2 :
334+ assert pool .num_tasks () == len (ts )
341335
342336 p2ts , p2rcs = add_task_chain (p2 , ni , count , feeder_channel = rcs [- 1 ], id_offset = count )
343337 assert p2 .num_tasks () == len (p2ts ) - 1
344338
345339 # Test multi-read(1)
346- print ("read(1) * %i" % ni )
347340 reader = rcs [- 1 ]
348341 st = time .time ()
349342 for i in range (ni ):
@@ -368,13 +361,15 @@ def _assert_async_dependent_tasks(self, pool):
368361 assert p2 .num_tasks () == 0
369362 del (p2 )
370363
371- assert pool .num_tasks () == null_tasks + len (ts )
364+ if py2 :
365+ assert pool .num_tasks () == null_tasks + len (ts )
372366
373367
374368 del (ts )
375369 del (rcs )
376370
377- assert pool .num_tasks () == null_tasks
371+ if py2 :
372+ assert pool .num_tasks () == null_tasks
378373
379374
380375 # ASSERTION: We already tested that one pool behaves correctly when an error
@@ -404,12 +399,14 @@ def test_base(self):
404399 num_threads = len (threading .enumerate ())
405400 for i in range (self .max_threads ):
406401 p .set_size (i )
407- assert p .size () == i
408- assert len (threading .enumerate ()) == num_threads + i
402+ if py2 :
403+ assert p .size () == i
404+ assert len (threading .enumerate ()) == num_threads + i
409405
410406 for i in range (self .max_threads , - 1 , - 1 ):
411407 p .set_size (i )
412- assert p .size () == i
408+ if py2 :
409+ assert p .size () == i
413410
414411 assert p .size () == 0
415412 # threads should be killed already, but we let them a tiny amount of time
@@ -433,19 +430,24 @@ def test_base(self):
433430
434431 ## SINGLE TASK #################
435432 self ._assert_single_task (p , False )
436- assert p .num_tasks () == 2
433+ if py2 :
434+ assert p .num_tasks () == 2
437435 del (urc1 )
438- assert p .num_tasks () == 1
436+ if py2 :
437+ assert p .num_tasks () == 1
439438
440439 p .remove_task (t2 )
441- assert p .num_tasks () == 0
440+ if py2 :
441+ assert p .num_tasks () == 0
442442 assert sys .getrefcount (t2 ) == 2
443443
444444 t3 = FixtureChannelThreadTask (urc2 , "channel" , None )
445445 urc3 = p .add_task (t3 )
446- assert p .num_tasks () == 1
446+ if py2 :
447+ assert p .num_tasks () == 1
447448 del (urc3 )
448- assert p .num_tasks () == 0
449+ if py2 :
450+ assert p .num_tasks () == 0
449451 assert sys .getrefcount (t3 ) == 2
450452
451453
@@ -458,16 +460,19 @@ def test_base(self):
458460 ##############################################
459461 # step one gear up - just one thread for now.
460462 p .set_size (1 )
461- assert p .size () == 1
462- assert len (threading .enumerate ()) == num_threads + 1
463+ if py2 :
464+ assert p .size () == 1
465+ assert len (threading .enumerate ()) == num_threads + 1
463466 # deleting the pool stops its threads - just to be sure ;)
464467 # Its not synchronized, hence we wait a moment
465468 del (p )
466469 time .sleep (0.05 )
467- assert len (threading .enumerate ()) == num_threads
470+ if py2 :
471+ assert len (threading .enumerate ()) == num_threads
468472
469473 p = ThreadPool (1 )
470- assert len (threading .enumerate ()) == num_threads + 1
474+ if py2 :
475+ assert len (threading .enumerate ()) == num_threads + 1
471476
472477 # here we go
473478 self ._assert_single_task (p , True )
0 commit comments