我试图理解Python中的线程。我已经查看了文档和示例,但坦率地说,许多示例过于复杂,我无法理解它们。
您如何清楚地显示为多线程划分的任务?
我试图理解Python中的线程。我已经查看了文档和示例,但坦率地说,许多示例过于复杂,我无法理解它们。
您如何清楚地显示为多线程划分的任务?
自2010年提出这个问题以来,如何使用python进行简单的多线程处理已经有了真正的简化 地图 和 池。
下面的代码来自一篇文章/博客文章,你绝对应该检查(没有隶属关系) - 并行在一行: 日常线程任务的更好模型。我将在下面总结 - 它最终只是几行代码:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
哪个是多线程版本:
results = []
for item in my_array:
results.append(my_function(item))
描述
Map是一个很酷的小函数,是轻松将并行性注入Python代码的关键。对于那些不熟悉的人来说,地图是从像Lisp这样的函数式语言中解脱出来的。它是一个在序列上映射另一个函数的函数。
Map为我们处理序列上的迭代,应用函数,并将所有结果存储在最后的方便列表中。
履行
地图函数的并行版本由两个库提供:多处理,以及它鲜为人知,但同样出色的步骤子:multiprocessing.dummy。
multiprocessing.dummy
与多处理模块完全相同, 但是使用线程代替 (一个重要的区别 - 为CPU密集型任务使用多个进程; IO期间(和期间)的线程):
multiprocessing.dummy复制多处理的API,但只不过是线程模块的包装器。
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# make the Pool of workers
pool = ThreadPool(4)
# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# close the pool and wait for the work to finish
pool.close()
pool.join()
时间结果如下:
Single thread: 14.4 seconds
4 Pool: 3.1 seconds
8 Pool: 1.4 seconds
13 Pool: 1.3 seconds
传递多个参数 (就像这样 仅在Python 3.3及更高版本中):
要传递多个数组:
results = pool.starmap(function, zip(list_a, list_b))
或传递常量和数组:
results = pool.starmap(function, zip(itertools.repeat(constant), list_a))
如果您使用的是早期版本的Python,则可以通过多个参数传递 这个解决方法。
(谢谢 user136036 对于有用的评论)
这是一个简单的示例:您需要尝试一些备用URL并返回第一个要响应的内容。
import Queue
import threading
import urllib2
# called by each thread
def get_url(q, url):
q.put(urllib2.urlopen(url).read())
theurls = ["http://google.com", "http://yahoo.com"]
q = Queue.Queue()
for u in theurls:
t = threading.Thread(target=get_url, args = (q,u))
t.daemon = True
t.start()
s = q.get()
print s
这是一种将线程用作简单优化的情况:每个子线程都在等待URL解析和响应,以便将其内容放在队列中;每个线程都是一个守护进程(如果主线程结束,则不会保持进程 - 这比不常见);主线程启动所有子线程,做一个 get
在队列中等待其中一个人完成了 put
然后发出结果并终止(这会删除任何可能仍在运行的子线程,因为它们是守护程序线程)。
在Python中正确使用线程总是与I / O操作相关联(因为CPython无论如何都不使用多个内核来运行CPU绑定任务,因此在等待某些I / O时,线程的唯一原因并不是阻止进程)。顺便说一下,队列几乎总是将工作分配到线程和/或收集工作结果的最佳方式,并且它们本质上是线程安全的,因此它们可以避免担心锁,条件,事件,信号量和其他线程协调/通信概念。
注意:对于Python中的实际并行化,您应该使用 多 用于分叉并行执行的多个进程的模块(由于全局解释器锁定,Python线程提供交错但实际上是串行执行,而不是并行执行,并且仅在交叉I / O操作时有用)。
但是,如果您只是在寻找交错(或者正在进行可以并行化的I / O操作,尽管全局解释器锁定),那么 穿线 模块是开始的地方。作为一个非常简单的例子,让我们通过并行求和子范围来考虑求和大范围的问题:
import threading
class SummingThread(threading.Thread):
def __init__(self,low,high):
super(SummingThread, self).__init__()
self.low=low
self.high=high
self.total=0
def run(self):
for i in range(self.low,self.high):
self.total+=i
thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result
请注意,上面是一个非常愚蠢的例子,因为它完全没有I / O,并且由于全局解释器锁定而在CPython中将被串行执行,尽管是交错的(带有上下文切换的额外开销)。
与其他提到的一样,由于GIL,CPython只能使用线程进行I \ O等待。 如果要从CPU绑定任务的多个内核中受益,请使用 多:
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
只需注意,线程不需要队列。
这是我能想象的最简单的例子,它显示了10个并发运行的进程。
import threading
from random import randint
from time import sleep
def print_number(number):
# Sleeps a random 1 to 10 seconds
rand_int_var = randint(1, 10)
sleep(rand_int_var)
print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"
thread_list = []
for i in range(1, 10):
# Instantiates the thread
# (i) does not make a sequence, so (i,)
t = threading.Thread(target=print_number, args=(i,))
# Sticks the thread in a list so that it remains accessible
thread_list.append(t)
# Starts threads
for thread in thread_list:
thread.start()
# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
thread.join()
# Demonstrates that the main process waited for threads to complete
print "Done"
Alex Martelli给出的答案对我有所帮助,但是这里的修改版本我认为更有用(至少对我而言)。
import Queue
import threading
import urllib2
worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']
#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
q.put(url)
#define a worker function
def worker(queue):
queue_full = True
while queue_full:
try:
#get your data off the queue, and do some work
url= queue.get(False)
data = urllib2.urlopen(url).read()
print len(data)
except Queue.Empty:
queue_full = False
#create as many threads as you want
thread_count = 5
for i in range(thread_count):
t = threading.Thread(target=worker, args = (q,))
t.start()
我发现这非常有用:创建与核心一样多的线程并让它们执行(大量)任务(在这种情况下,调用shell程序):
import Queue
import threading
import multiprocessing
import subprocess
q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
q.put(i)
def worker():
while True:
item = q.get()
#execute a task: call a shell program and wait until it completes
subprocess.call("echo "+str(item), shell=True)
q.task_done()
cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
q.join() #block until all tasks are done
对我来说,线程的完美示例是监视异步事件。看看这段代码。
# thread_test.py
import threading
import time
class Monitor(threading.Thread):
def __init__(self, mon):
threading.Thread.__init__(self)
self.mon = mon
def run(self):
while True:
if self.mon[0] == 2:
print "Mon = 2"
self.mon[0] = 3;
您可以通过打开IPython会话并执行以下操作来使用此代码:
>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2
等几分钟
>>>a[0] = 2
Mon = 2
给定一个功能, f
,像这样穿线:
import threading
threading.Thread(target=f).start()
传递参数 f
threading.Thread(target=f, args=(a,b,c)).start()
Python 3具有以下功能 启动并行任务。这使我们的工作更轻松。
以下是一个见解:
ThreadPoolExecutor示例
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()