题 如何在Python中使用线程?


我试图理解Python中的线程。我已经查看了文档和示例,但坦率地说,许多示例过于复杂,我无法理解它们。

您如何清楚地显示为多线程划分的任务?


932
2018-05-17 04:24


起源


围绕这个主题的一个很好的一般性讨论可以在 Python最难的问题 作者:Jeff Knupp。总之,似乎线程不适合初学者。 - Matthew Walker
哈哈,我倾向于认为线程适合每个人,但初学者不是为了线程:))))) - Bohdan
只是为了标记人们应该阅读所有的答案,因为后来的答案可以说是更好的,因为新的语言功能被利用... - Gwyn Evans
记得用C编写核心逻辑并通过ctypes调用它来真正利用Python线程。 - aaa90210
我只想补充一点 PyPubSub 是一种发送和接收消息来控制线程流的好方法 - ytpillai


答案:


自2010年提出这个问题以来,如何使用python进行简单的多线程处理已经有了真正的简化 地图 和

下面的代码来自一篇文章/博客文章,你绝对应该检查(没有隶属关系) - 并行在一行: 日常线程任务的更好模型。我将在下面总结 - 它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

哪个是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很酷的小函数,是轻松将并行性注入Python代码的关键。对于那些不熟悉的人来说,地图是从像Lisp这样的函数式语言中解脱出来的。它是一个在序列上映射另一个函数的函数。

Map为我们处理序列上的迭代,应用函数,并将所有结果存储在最后的方便列表中。

enter image description here


履行

地图函数的并行版本由两个库提供:多处理,以及它鲜为人知,但同样出色的步骤子:multiprocessing.dummy。

multiprocessing.dummy 与多处理模块完全相同, 但是使用线程代替 (一个重要的区别  - 为CPU密集型任务使用多个进程; IO期间(和期间)的线程):

multiprocessing.dummy复制多处理的API,但只不过是线程模块的包装器。

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

时间结果如下:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数 (就像这样 仅在Python 3.3及更高版本中):

要传递多个数组:

results = pool.starmap(function, zip(list_a, list_b))

或传递常量和数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是早期版本的Python,则可以通过多个参数传递 这个解决方法

(谢谢 user136036 对于有用的评论)


1013
2018-02-11 19:53



这只是缺乏选票,因为它是如此新鲜发布。这个答案非常有效并且演示了“地图”功能,它提供了比其他答案更容易理解的语法。 - idle
这是偶数线程而不是进程吗?它似乎尝试多进程!=多线程 - wolfdawn
顺便说一句,伙计们,你可以写 with Pool(8) as p: p.map( *whatever* ) 并摆脱簿记线。 - Barafu Albino
@BarafuAlbino:很有用,可能值得注意这一点 仅适用于Python 3.3+。 - fuglede
你怎么能留下这个答案,而不是提到这只对I / O操作有用?这只能在一个线程上运行,这对大多数情况来说都是无用的,并且实际上比正常方式更慢 - Frobot


这是一个简单的示例:您需要尝试一些备用URL并返回第一个要响应的内容。

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

这是一种将线程用作简单优化的情况:每个子线程都在等待URL解析和响应,以便将其内容放在队列中;每个线程都是一个守护进程(如果主线程结束,则不会保持进程 - 这比不常见);主线程启动所有子线程,做一个 get 在队列中等待其中一个人完成了 put然后发出结果并终止(这会删除任何可能仍在运行的子线程,因为它们是守护程序线程)。

在Python中正确使用线程总是与I / O操作相关联(因为CPython无论如何都不使用多个内核来运行CPU绑定任务,因此在等待某些I / O时,线程的唯一原因并不是阻止进程)。顺便说一下,队列几乎总是将工作分配到线程和/或收集工作结果的最佳方式,并且它们本质上是线程安全的,因此它们可以避免担心锁,条件,事件,信号量和其他线程协调/通信概念。


672
2018-05-17 04:36



再次感谢,MartelliBot。我已经更新了示例,等待所有网址响应:import Queue,threading,urllib2 q = Queue.Queue()urls ='''a.com  b.com  c.com ''”。分割()  urls_received = 0 def get_url(q,url):req = urllib2.Request(url)resp = urllib2.urlopen(req)q.put(resp.read())global urls_received urls_received + = 1在url中打印urls_received: t = threading.Thread(target = get_url,args =(q,u))t.daemon = True t.start()while q.empty()和urls_received <len(urls):s = q.get()print小号 - htmldrum
@JRM:如果你看下面的下一个答案,我认为等待线程完成的更好方法是使用 join() 方法,因为这会使主线程等到它们完成而不通过不断检查值来消耗处理器。 @Alex:谢谢,这正是我需要了解如何使用线程。 - krs013
对于python3,将'import urllib2'替换为'import urllib.request as urllib2'。并在括号中加上括号。 - Harvey
对于python 3替换 Queue 模块名称 queue。方法名称是相同的。 - JSmyth
我注意到该解决方案只会打印出其中一个页面。要从队列中打印两个页面,只需再次运行该命令: s = q.get()  print s  @ krs013你不需要 join 因为Queue.get()是阻塞的。 - Tom Anderson


注意:对于Python中的实际并行化,您应该使用  用于分叉并行执行的多个进程的模块(由于全局解释器锁定,Python线程提供交错但实际上是串行执行,而不是并行执行,并且仅在交叉I / O操作时有用)。

但是,如果您只是在寻找交错(或者正在进行可以并行化的I / O操作,尽管全局解释器锁定),那么 穿线 模块是开始的地方。作为一个非常简单的例子,让我们通过并行求和子范围来考虑求和大范围的问题:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

请注意,上面是一个非常愚蠢的例子,因为它完全没有I / O,并且由于全局解释器锁定而在CPython中将被串行执行,尽管是交错的(带有上下文切换的额外开销)。


226
2018-05-17 04:35



@Alex,我没有说它是实用的,但它确实演示了如何定义和生成线程,我认为这是OP想要的。 - Michael Aaron Safyan
虽然这确实显示了如何定义和生成线程,但它实际上并不是并行地对子范围求和。 thread1 在主线程阻塞之前运行直到它完成,然后同样的事情发生 thread2,然后主线程恢复并打印出它们累积的值。 - martineau
不应该这样 super(SummingThread, self).__init__()?如在 stackoverflow.com/a/2197625/806988 - James Andres
@JamesAndres,假设没有人继承“SummingThread”,那么任何一个都可以正常工作;在这种情况下,super(SummingThread,self)只是在方法解析顺序(MRO)中查找下一个类的一种奇特方式,它是threading.Thread(然后调用) 在里面 在这两种情况下)。但是,你是对的,使用super()是当前Python的更好的风格。当我提供这个答案时,Super是相对较新的,因此直接调用超类而不是使用super()。不过,我会更新这个以使用super。 - Michael Aaron Safyan
警告:不要在这样的任务中使用多线程!如Dave Beazley所示: dabeaz.com/python/NewGIL.pdf2个CPU上的2个python线程执行CPU重度任务2次SLOWER比1个CPU上的1个线程和1个CPU上2个线程的SLOWER慢1.5倍。这种奇怪的行为是由于操作系统和Python之间的协调错误协调造成的。线程的实际用例是I / O繁重的任务。例如。当您通过网络执行读/写操作时,将等待数据读取/写入的线程放到后台并将CPU切换到另一个需要处理数据的线程是有意义的。 - Boris Burkov


与其他提到的一样,由于GIL,CPython只能使用线程进行I \ O等待。 如果要从CPU绑定任务的多个内核中受益,请使用

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

87
2018-03-08 22:22



你能解释一下这有什么作用吗? - pandita
@pandita:代码创建一个进程,然后启动它。所以现在有两件事情同时发生:程序的主线,以及从目标开始的过程, f 功能。与此同时,主程序现在只是等待进程退出, join用它。如果主要部分刚刚退出,则子进程可能会或可能不会运行完成,因此执行a join 总是值得推荐。 - johntellsall
一个扩展的答案,包括 map 功能在这里: stackoverflow.com/a/28463266/2327328 - philshem
@philshem小心b / c你发布的链接是使用这里提到的线程池(不是进程) stackoverflow.com/questions/26432411/...。但是,这个答案正在使用一个过程。我是新手,但似乎(由于GIL),在Python中使用多线程时,你只会在特定情况下获得性能提升。但是,使用一个进程池可以利用多核处理器,在一个进程上有超过1个核心工作。 - user3731622
这是实际做一些有用的事情并利用多个CPU核心的最佳答案 - Frobot


只需注意,线程不需要队列。

这是我能想象的最简单的例子,它显示了10个并发运行的进程。

import threading
from random import randint
from time import sleep


def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

85
2017-09-23 16:07



将最后一个引号添加到“Done to make it print”Done“ - iChux
我比Martelli更喜欢这个例子,它更容易玩。但是,我建议printNumber执行以下操作,使其更清晰一些:它应该在睡眠之前将randint保存到变量中,然后将打印更改为“Thread”+ str(数字)+“睡觉”+ theRandintVariable +“秒” - Nickolai
有没有办法知道每个线程何时完成,因为它完成了? - Matt
@Matt有几种方法可以做这样的事情,但这取决于你的需求。一种方法是更新单个或一些其他可公开访问的变量,这些变量在while循环中被观察并在线程结束时更新。 - Douglas Adams


Alex Martelli给出的答案对我有所帮助,但是这里的修改版本我认为更有用(至少对我而言)。

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

39
2017-10-01 15:50



为什么不突破异常呢? - Stavros Korokithakis
你可以,只是个人喜好 - JimJty
我没有运行代码,但是你不需要守护线程吗?我认为在最后一个for循环之后,你的程序可能会退出 - 至少它应该是因为这就是线程应该如何工作。我认为更好的方法不是将工作数据放入队列中,而是将输出放入队列,因为那时你可以有一个不仅仅是一个主循环 手柄 从工人那里进入队列的信息,但现在它也没有线程,而你 知道 它不会过早退出。 - dylnmc
@dylnmc,这是我的用例之外(我的输入队列是预定义的)。如果你想走你的路线,我建议看看 芹菜 - JimJty


我发现这非常有用:创建与核心一样多的线程并让它们执行(大量)任务(在这种情况下,调用shell程序):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done

19
2018-06-06 23:51



@shavenwarthog肯定可以根据个人的需要调整“cpus”变量。无论如何,子进程调用将产生子进程,这些将由OS分配cpus(python的“父进程”并不代表子进程的“相同CPU”)。 - dolphin
你是对的,我对“线程与父进程在同一CPU上启动”的评论是错误的。谢谢回复! - johntellsall
也许值得注意的是,与使用相同内存空间的多线程不同,多处理不能轻易地共享变量/数据。虽然+1。 - fantabolous


对我来说,线程的完美示例是监视异步事件。看看这段代码。

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

您可以通过打开IPython会话并执行以下操作来使用此代码:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等几分钟

>>>a[0] = 2
Mon = 2

15
2018-04-14 04:18



我们如何手动停止线程? @ dvreed77 - deeshank
AttributeError:'Monitor'对象没有属性'stop'? - pandita
在等待事件发生时,你不是在甩掉CPU周期吗?并不总是非常实际的事情。 - mogul
像大人说的那样,这将会不断执行。至少你可以添加一个短暂的睡眠,比如睡眠(0.1),这可能会大大减少cpu的使用,就像这样一个简单的例子。 - fantabolous
这是一个可怕的例子,浪费了一个核心。至少添加一个睡眠,但正确的解决方案是使用一些信号机制。 - PureW


给定一个功能, f,像这样穿线:

import threading
threading.Thread(target=f).start()

传递参数 f

threading.Thread(target=f, args=(a,b,c)).start()

15
2018-03-16 16:07



这非常简单。当你完成线程时,如何确保线程关闭? - cameronroytaylor
据我所知,当函数退出时 Thread 对象清理。看到 文档。有一个 is_alive() 如果需要,可以用来检查线程的方法。 - starfry
我看到了 is_alive 方法,但我无法弄清楚如何将其应用于线程。我试过分配 thread1=threading.Thread(target=f).start() 然后检查它 thread1.is_alive()但是 thread1 填充 None,那里没有运气。你知道是否有其他方式来访问该线程? - cameronroytaylor
您需要将线程对象分配给变量,然后使用该变量启动它: thread1=threading.Thread(target=f) 其次是 thread1.start()。那你可以做 thread1.is_alive()。 - starfry
那很有效。是的,测试 thread1.is_alive() 回报 False 一旦功能退出。 - cameronroytaylor


Python 3具有以下功能 启动并行任务。这使我们的工作更轻松。

它有 线程池 和 流程池

以下是一个见解:

ThreadPoolExecutor示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

14
2017-07-20 11:17