题 使用并行流返回最快的提供值


我有一组供应商,它们都有相同的结果,但速度不同(且不同)。

我想要一种优雅的方式同时启动供应商,只要其中一个产生了价值,就将其退回(丢弃其他结果)。

我尝试过使用并行流和 Stream.findAny() 为此,它似乎总是阻止,直到产生所有结果。

这是一个单元测试,展示了我的问题:

import org.junit.Test;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.Assert.*;

public class RaceTest {

    @Test
    public void testRace() {
        // Set up suppliers
        Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
        suppliers.add(() -> "fast"); // This supplier returns immediately
        suppliers.add(() -> {
            try {
                Thread.sleep(10_000);
                return "slow";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }); // This supplier takes 10 seconds to produce a value

        Stream<Supplier<String>> stream = suppliers.parallelStream();
        assertTrue(stream.isParallel()); // Stream can work in parallel
        long start = System.currentTimeMillis();
        Optional<String> winner = stream
                .map(Supplier::get)
                .findAny();
        long duration = System.currentTimeMillis() - start;
        assertTrue(winner.isPresent()); // Some value was produced
        assertEquals("fast", winner.get()); // The value is "fast"
        assertTrue(duration < 9_000); // The whole process took less than 9 seconds
    }
}

测试结果是最后一个断言失败,因为整个测试大约需要10秒才能完成。

我在这做错了什么?


13
2017-10-06 11:15


起源


我刚刚测试了你的代码并且没有失败(这里是JDK 1.8.0_51)。但 findAny 是不确定的,所以这只是运气。 - Tunaki
如果您添加快速供应商4次(设置大小= 5),它按预期工作,我怀疑当流太小时,它将恢复为顺序执行。 - assylias
源不需要 Concurrent… 这里收藏。任何集合都可以,只要您在操作期间不修改它。实际上,由于它在内部的工作方式,普通集合在此任务中表现更好,即使在并行执行时也是如此。但正如其他人已经说过的那样 Stream API不是正确的工具。 - Holger


答案:


在这种情况下,您最好使用 Callable 代替 Supplier (相同的功能签名)并使用自Java 5以来存在的良好的旧并发API:

Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
        Thread.sleep(10_000);
        return "slow";
    }
);

ExecutorService es=Executors.newCachedThreadPool();
try {

    String result = es.invokeAny(suppliers);
    System.out.println(result);

} catch (InterruptedException|ExecutionException ex) {
    Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();

注意,整个“全部运行并返回最快”的方法如何成为单个方法调用...

一旦有一个结果可用,它还有取消/中断所有待处理操作的好处,因此慢速操作实际上不会在这里等待整整十秒(好吧,在大多数情况下,因为时间不是确定性的)。


11
2017-10-06 12:04



Javadoc invokeAny 说“执行给定的任务,返回已成功完成的任务的结果”。这是否意味着返回的那个总是第一个完成的任务(而不是另一个)? - Tunaki
@Tunaki: interface 不能指定这样的保证,因为它取决于实际执行者实现的逻辑,是否是这种情况。例如,在线程池执行程序的情况下,您必须确保它对线程数没有限制,或者至少有任务可用的任务线程可以按预期工作。遗嘱执行人归还 newCachedThreadPool()没有限制。但最快的回归是明显的意图 invokeAny如果你想让它尽可能正式,那么在并发方面就没有这样的保证...... - Holger
这非常有效,并且还像我预期的那样处理失败的任务(仅在没有任务成功的情况下抛出异常) - Gustav Karlsson


您当前使用的代码是不确定的。引用Javadoc findAny()

此操作的行为明确是不确定的;可以自由选择流中的任何元素。

你可以用一个 CompletionService 并将所有任务提交给它。然后, CompletionService.take() 将返回 Future 第一个完成的任务。

long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds

3
2017-10-06 11:50



这是恕我直言,比目前接受的答案更具确定性和惯用性。或者说那样: 这正是如此 CompletionService 是为了! - Marco13


Stream API不适合这样的事情,因为它不能保证任务何时完成。更好的解决方案是使用 CompletableFuture

long start = System.currentTimeMillis();
String winner = CompletableFuture
        .anyOf(suppliers.stream().map(CompletableFuture::supplyAsync)
                .toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds

请注意,如果常见的FJP没有足够的并行度,它仍然可能无法并行启动所有供应商。要解决此问题,您可以创建自己的池,该池具有所需的并行度级别:

long start = System.currentTimeMillis();
ForkJoinPool fjp = new ForkJoinPool(suppliers.size());
String winner = CompletableFuture
        .anyOf(suppliers.stream().map(s -> CompletableFuture.supplyAsync(s, fjp))
                .toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
fjp.shutdownNow();

2
2017-10-06 11:28