本站首页    管理页面    写新日志    退出


«October 2025»
1234
567891011
12131415161718
19202122232425
262728293031


公告
 本博客在此声明所有文章均为转摘,只做资料收集使用。

我的分类(专题)

日志更新

最新评论

留言板

链接

Blog信息
blog名称:
日志总数:1304
评论数量:2242
留言数量:5
访问次数:7623468
建立时间:2006年5月29日




[Python]用 Python 实现的线程池
软件技术

lhwork 发表于 2007/2/1 8:53:26

为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。 下面是用Python实现的通用的线程池代码: undefinedview plaincopy to clipboardprint? import Queue, threading, sys   from threading import Thread    import time,urllib      # working thread    class Worker(Thread):        worker_count = 0        def __init__( self, workQueue, resultQueue, timeout = 0, **kwds):            Thread.__init__( self, **kwds )            self.id = Worker.worker_count            Worker.worker_count += 1            self.setDaemon( True )            self.workQueue = workQueue            self.resultQueue = resultQueue            self.timeout = timeout           def run( self ):            ''' the get-some-work, do-some-work main loop of worker threads '''           while True:                try:                    callable, args, kwds = self.workQueue.get(timeout=self.timeout)                    res = callable(*args, **kwds)                    print "worker[%2d]: %s" % (self.id, str(res) )                    self.resultQueue.put( res )                except Queue.Empty:                    break               except :                    print 'worker[%2d]' % self.id, sys.exc_info()[:2]                        class WorkerManager:        def __init__( self, num_of_workers=10, timeout = 1):            self.workQueue = Queue.Queue()            self.resultQueue = Queue.Queue()            self.workers = []            self.timeout = timeout            self._recruitThreads( num_of_workers )           def _recruitThreads( self, num_of_workers ):            for i in range( num_of_workers ):                worker = Worker( self.workQueue, self.resultQueue, self.timeout )                self.workers.append(worker)           def start(self):            for w in self.workers:                w.start()           def wait_for_complete( self):            # ...then, wait for each of them to terminate:            while len(self.workers):                worker = self.workers.pop()                worker.join( )                if worker.isAlive() and not self.workQueue.empty():                    self.workers.append( worker )            print "All jobs are are completed."           def add_job( self, callable, *args, **kwds ):            self.workQueue.put( (callable, args, kwds) )           def get_result( self, *args, **kwds ):            return self.resultQueue.get( *args, **kwds )      import Queue, threading, sys from threading import Thread import time,urllib # working thread class Worker(Thread): worker_count = 0 def __init__( self, workQueue, resultQueue, timeout = 0, **kwds): Thread.__init__( self, **kwds ) self.id = Worker.worker_count Worker.worker_count += 1 self.setDaemon( True ) self.workQueue = workQueue self.resultQueue = resultQueue self.timeout = timeout def run( self ): ''' the get-some-work, do-some-work main loop of worker threads ''' while True: try: callable, args, kwds = self.workQueue.get(timeout=self.timeout) res = callable(*args, **kwds) print "worker[%2d]: %s" % (self.id, str(res) ) self.resultQueue.put( res ) except Queue.Empty: break except : print 'worker[%2d]' % self.id, sys.exc_info()[:2] class WorkerManager: def __init__( self, num_of_workers=10, timeout = 1): self.workQueue = Queue.Queue() self.resultQueue = Queue.Queue() self.workers = [] self.timeout = timeout self._recruitThreads( num_of_workers ) def _recruitThreads( self, num_of_workers ): for i in range( num_of_workers ): worker = Worker( self.workQueue, self.resultQueue, self.timeout ) self.workers.append(worker) def start(self): for w in self.workers: w.start() def wait_for_complete( self): # ...then, wait for each of them to terminate: while len(self.workers): worker = self.workers.pop() worker.join( ) if worker.isAlive() and not self.workQueue.empty(): self.workers.append( worker ) print "All jobs are are completed." def add_job( self, callable, *args, **kwds ): self.workQueue.put( (callable, args, kwds) ) def get_result( self, *args, **kwds ): return self.resultQueue.get( *args, **kwds ) Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到resultQueue中,这里的workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue中获取任务超时,则线程结束。 WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。 一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。 undefinedview plaincopy to clipboardprint? def test_job(id, sleep = 0.001 ):        try:            urllib.urlopen('https://www.gmail.com/').read()        except:            print '[%4d]' % id, sys.exc_info()[:2]        return  id      def test():        import socket       socket.setdefaulttimeout(10)        print 'start testing'        wm = WorkerManager(10)        for i in range(500):            wm.add_job( test_job, i, i*0.001 )        wm.start()        wm.wait_for_complete()        print 'end testing'      def test_job(id, sleep = 0.001 ): try: urllib.urlopen('https://www.gmail.com/').read() except: print '[%4d]' % id, sys.exc_info()[:2] return id def test(): import socket socket.setdefaulttimeout(10) print 'start testing' wm = WorkerManager(10) for i in range(500): wm.add_job( test_job, i, i*0.001 ) wm.start() wm.wait_for_complete() print 'end testing' 完成的程序可以在这里下载。


阅读全文(5178) | 回复(0) | 编辑 | 精华
 



发表评论:
昵称:
密码:
主页:
标题:
验证码:  (不区分大小写,请仔细填写,输错需重写评论内容!)



站点首页 | 联系我们 | 博客注册 | 博客登陆

Sponsored By W3CHINA
W3CHINA Blog 0.8 Processed in 0.051 second(s), page refreshed 144798977 times.
《全国人大常委会关于维护互联网安全的决定》  《计算机信息网络国际联网安全保护管理办法》
苏ICP备05006046号