2015年6月11日 星期四

Multi-thread + Queue

queue + thread 使用範例

Thread:
只需要寫 __init__() 和 run(),join()是為了 stop_event 的 sync
thread object init完後 call xxx_thread.start() 即可 start thread

Queue:
把要執行的工作都用 put 丟進queue,thread run() function會去 get queue data 來處理
記住 - 處理完後必須 call q.task_done() 來讓 Queue 知道 task 已經完成
Queue.Queue(10) : max queue size is 10
如 task get 速度不夠快,put() 會 block 住直到有空的 queue 可以 put 進去
q.join() : Blocks until all items in the Queue have been gotten and processed
(gotten: get(),  processed: task_done())

因為沒有 terminate thread 的方法,利用 stop_event 來讓 thread 自己 terminate:
join() -> stop_event.set()
run() -> detect到stop_event.isSet()就離開while loop

get() 會 block 直到有 task 才會 return,為避免queue empty時 thread 被 get() block住無法正常結束,可以把 get() 改成 get(block=True, timeout=1)
timeout時間到了後會raise Queue.Empty exception,如此就可跳出 get()


import Queue
from threading import Timer, Thread, Event
import time

class test_thread(Thread):
    def __init__(self, q):
        Thread.__init__(self)
        self.q = q
        self.stop_event = Event()
    def run(self):
        q = self.q
        while not self.stop_event.isSet():
            try:
                a, b, c = q.get(block=True, timeout=1)
            except Queue.Empty:
                print "empty queue"
                continue
            print a, b, c
            q.task_done()
        print "thread end"
    def join(self, timeout=None):
        print "thread join"
        self.stop_event.set()
        Thread.join(self, timeout)

q = Queue.Queue(10)

consumer_thread = test_thread(q)
consumer_thread.start()

for i in range(100):
    q.put([i, i+1, i+2])
q.join()
consumer_thread.join()
print "done"