题 Python相当于Golang在频道上的选择


Go有一个适用于频道的select语句。从文档:

select语句允许goroutine等待多次通信   操作。

一个选择块直到其中一个案例可以运行,然后执行它   案件。如果多个准备好,它会随机选择一个。

是否有Python等效的以下代码:

package main

import "fmt"

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            c1 <- i
        }
        quit <- 0
    }()

    go func() {
        for i := 0; i < 2; i++ {
            c2 <- i
        }
    }()

    for {
        select {
        case <-c1:
            fmt.Println("Received value from c1")
        case <-c2:
            fmt.Println("Received value from c2")
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

该计划的输出:

Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit

14
2017-10-02 06:16


起源


可能重复 在多个Python多处理队列上“选择”? - Erik Allik


答案:


这是一个非常直接的翻译,但“选择哪个如果多个已经准备好”部分的工作方式不同 - 它只是采取了先发生的事情。这也就像运行代码一样 gomaxprocs(1)

import threading
import Queue

def main():
    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    combined = Queue.Queue(maxsize=0)

    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))

    t = threading.Thread(target=listen_and_forward, args=(c1,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(c2,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(quit,))
    t.daemon = True
    t.start()

    while True:
        which, message = combined.get()
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

基本的变化是使用组合消息的线程模拟选择。如果您打算多使用此模式,可以编写一些选择代码:

import threading
import Queue

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = threading.Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

但...

请注意,这个选择不是很好的选择,虽然它对你的程序无关紧要 - 如果我们并不总是遍历,那么goroutine可以在一个将在select中排队的频道发送结果并丢失选择完成!


10
2017-10-02 06:57





还要考虑一下 偏移库 作者:Benoit Chesneau。它是Python的Go并发模型的一个端口,使用了光纤。

他在PyCon APAC 2013上发表了关于此事的演讲:


8
2017-10-02 18:04





您可以使用 multiprocessing.Pipe 代替 chanthreading.Thread 代替 go 和 select.select 代替 select

下面是使用这种方法在Python中重新实现你的go示例:

import random
from multiprocessing import Pipe
from select import select
from threading import Thread


def main():
    c1_r, c1_w = Pipe(duplex=False)
    c2_r, c2_w = Pipe(duplex=False)
    quit_r, quit_w = Pipe(duplex=False)

    def func1():
        for i in range(10):
            c1_w.send(i)
        quit_w.send(0)

    Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2_w.send(i)

    Thread(target=func2).start()

    while True:
        ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
        which = random.choice(ready)
        if which == c1_r:
            c1_r.recv()
            print 'Received value from c1'
        elif which == c2_r:
            c2_r.recv()
            print 'Received value from c2'
        elif which == quit_r:
            quit_r.recv()
            print 'Received value from quit'
            return

if __name__ == '__main__':
    main()

这个实现基于@Thomas的实现,但与@ Thomas不同,它不会产生额外的线程来执行select。

使用Python 2.7.13在Linux上测试。 Windows可能表现不同,因为select是Unixy的东西。


5
2018-03-16 01:24





使用Python 3.5有关键字 async 和 await 这使得可以具有可以在执行中暂停的功能,从而能够在偶数循环而不是线程上运行。该 asyncio std lib正在提供一个。

更直接地映射Go阻塞通道的行为 select 你可以利用 这个小图书馆 然后你的示例代码在Python中看起来非常相似。


3
2018-02-22 00:37





这是另一个,模仿go语法的尝试:

from threading import Thread
from Queue import Queue

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    Thread(target=lambda: [c1.put(i) for i in range(10)] or quit.put(0)).start()
    Thread(target=lambda: [c2.put(i) for i in range(2)]).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

main()

2
2017-10-02 07:44





是的,一切皆有可能 goless。你可以尝试一下。

玩的开心 ;-)

这是一个例子:

c1 = goless.chan()
c2 = goless.chan()

def func1():
    time.sleep(1)
    c1.send('one')
goless.go(func1)

def func2():
    time.sleep(2)
    c2.send('two')
goless.go(func2)

for i in range(2):
    case, val = goless.select([goless.rcase(c1), goless.rcase(c2)])
    print(val)

2
2017-09-01 10:52