题 发电机是否有线程安全?


我有一个多线程程序,我在其中创建一个生成器函数,然后将其传递给新线程。我希望它在本质上是共享/全局的,因此每个线程都可以从生成器获取下一个值。

使用这样的生成器是安全的,还是会遇到从多个线程访问共享生成器的问题/条件?

如果没有,是否有更好的方法来解决问题?我需要一些循环遍历列表并为任何线程调用它生成下一个值的东西。


37
2017-07-15 13:32


起源




答案:


它不是线程安全的;同时调用可能会交错,并与局部变量混淆。

常见的方法是使用主从模式(现在称为PC中的农民工模式)。创建一个生成数据的第三个线程,并在主服务器和从服务器之间添加一个队列,其中从服务器将从队列中读取,并且主服务器将写入该队列。标准队列模块提供必要的线程安全性并安排阻塞主站,直到从站准备好读取更多数据。


50
2017-07-15 13:37



对Queue.Queue来说肯定是+1,这是组织线程系统的好方法(大部分时间都适用于此任务)。 - Alex Martelli


编辑在下面添加基准。

你可以用锁来包裹发电机。例如,

import threading
class LockedIterator(object):
    def __init__(self, it):
        self.lock = threading.Lock()
        self.it = it.__iter__()

    def __iter__(self): return self

    def next(self):
        self.lock.acquire()
        try:
            return self.it.next()
        finally:
            self.lock.release()

gen = [x*2 for x in [1,2,3,4]]
g2 = LockedIterator(gen)
print list(g2)

锁定在我的系统上需要50ms,Queue需要350ms。当你真的有队列时,队列很有用;例如,如果您有传入的HTTP请求,并且您希望将它们排队以供工作线程处理。 (这不适合Python迭代器模型 - 一旦迭代器耗尽了项目,就完成了。)如果你确实有迭代器,那么LockedIterator是一种更快捷,更简单的方法来使其成为线程安全的。

from datetime import datetime
import threading
num_worker_threads = 4

class LockedIterator(object):
    def __init__(self, it):
        self.lock = threading.Lock()
        self.it = it.__iter__()

    def __iter__(self): return self

    def next(self):
        self.lock.acquire()
        try:
            return self.it.next()
        finally:
            self.lock.release()

def test_locked(it):
    it = LockedIterator(it)
    def worker():
        try:
            for i in it:
                pass
        except Exception, e:
            print e
            raise

    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

def test_queue(it):
    from Queue import Queue
    def worker():
        try:
            while True:
                item = q.get()
                q.task_done()
        except Exception, e:
            print e
            raise

    q = Queue()
    for i in range(num_worker_threads):
         t = threading.Thread(target=worker)
         t.setDaemon(True)
         t.start()

    t1 = datetime.now()

    for item in it:
        q.put(item)

    q.join()

start_time = datetime.now()
it = [x*2 for x in range(1,10000)]

test_locked(it)
#test_queue(it)
end_time = datetime.now()
took = end_time-start_time
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)

41
2017-07-15 19:55



使用Queue.Queue效率较低,但做得很好。 - gooli


不,它们不是线程安全的。您可以在以下位置找到有关生成器和多线程的有趣信息:

http://www.dabeaz.com/generators/Generators.pdf


5
2017-07-15 13:44





这取决于您正在使用的python实现。在CPython中,GIL使python对象上的所有操作都是线程安全的,因为在任何给定时间只有一个线程可以执行代码。

http://en.wikipedia.org/wiki/Global_Interpreter_Lock


-8
2017-07-15 15:54



“GIL使python对象上的所有操作都线程安全” - 是吧?所有操作都不是原子的 - Corey Goldberg
这是危险的误导。 GIL只意味着Python代码不会在多线程环境中破坏Python状态:你不能在字节码操作中改变线程。 (例如,您可以修改共享的dict而不会破坏它。)您仍然可以在任意两个字节码操作之间更改线程。 - Glenn Maynard