
example
public void test() throws Exception { ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); List<Future> futureList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Future future = threadPoolExecutor.submit(() -> { System.out.println("do some work"); int time = new Random().nextInt(2000); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } }); futureList.add(future); } for (Future future : futureList) { future.get(1000, TimeUnit.MILLISECONDS); } } 我想在 1 秒内,批量查询,如果某次查询超时,就不要结果。最后获取所有成功的查询结果
现在的写法是有问题的,每次从 futureList 获取结果都是阻塞的,最终结果肯定是大于 1 秒的,有没有好办法或者轮子?
1 Ariver 2021-04-16 17:04:43 +08:00 你需要 Reactor. |
2 DanielGuo 2021-04-16 17:06:22 +08:00 public void test() throws Exception { ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); List<Future> futureList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Future future = threadPoolExecutor.submit(() -> { System.out.println("do some work"); int time = new Random().nextInt(2000); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } }); futureList.add(future); } Thread.sleep(1000); boolean allDOne= futureList.stream().map(f -> f.isDone()).allMatch(result -> result == true); if (allDone) { for (Future future : futureList) { future.get(); } } } |
3 DanielGuo 2021-04-16 17:07:04 +08:00 等待一秒,判断是否全部完成,然后获取结果。。。 |
4 guxingke 2021-04-16 17:08:08 +08:00 CompletableFuture.allOf(f1,f2...fn).get(timeout) > 也许可以,没验证 |
5 zhangslob669 OP @DanielGuo 这样就必须强制等待了,并不是一种优雅的做法;而且项目里不允许写 Thread.sleep(1000);等代码 |
7 securityCoding 2021-04-16 17:19:50 +08:00 Completablefuture 直接用这个 |
8 SlipStupig 2021-04-16 17:21:51 +08:00 结果用异步回调 |
9 xiaoxinshiwo 2021-04-16 17:33:37 +08:00 CountDownLatch 不香吗 |
10 blisteringsands 2021-04-16 19:42:10 +08:00 submit()之后取一下当前时间,续 1 秒算出 deadline 每次 future.get 之前重新取当前时间,和 deadline 减一下算出等待时间 |
11 zzl22100048 2021-04-16 19:56:01 +08:00 via iPhone 你这要求完美符合 completablefuture |
12 zhady009 2021-04-16 19:59:52 +08:00 CompletableFuture 有个 completeOnTimeout 超时的时候可以设置默认值给个 null 最后过滤掉为 null 的 |
13 zhady009 2021-04-16 20:09:49 +08:00 ```java @Test public void demo() { QueryTask var0 = new QueryTask(900); QueryTask var1 = new QueryTask(2100); QueryTask var2 = new QueryTask(2000); QueryTask var3 = new QueryTask(2000); Demo<QueryTask, Integer> test = new Demo<>(1000, List.of(var0, var1, var2, var3)); long l = System.currentTimeMillis(); Collection<Integer> d = test.execute(); System.out.println(System.currentTimeMillis() - l); assert d.size() > 0; for (Integer integer : d) { assert integer <= 1000; } } static class Demo<T extends Supplier<E>, E> { private static final ExecutorService ES = Executors.newFixedThreadPool(10); private final int timeout; private final Collection<T> tasks; Demo(int timeout, Collection<T> tasks) { this.timeout = timeout; this.tasks = tasks; } public List<E> execute() { List<CompletableFuture<E>> collect = tasks.stream().map(x -> CompletableFuture.supplyAsync(x, ES) .completeOnTimeout(null, timeout, TimeUnit.MILLISECONDS)) .collect(Collectors.toUnmodifiableList()); CompletableFuture<List<E>> listCompletableFuture = CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()])) .thenApply(v -> collect.stream().map(CompletableFuture::join) .filter(Objects::nonNull) .collect(Collectors.toList())); return listCompletableFuture.join(); } } static class QueryTask implements Supplier<Integer> { private final int time; QueryTask(int time) { this.time = time; } @Override public Integer get() { try { //query Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } return time; } } ``` |
14 zhgg0 2021-04-17 00:37:05 +08:00 via iPhone 用线程池的 invokeAll 方法。 或者 timeout 每次 get 前实时算。 |
15 yazinnnn 2021-04-17 18:28:55 +08:00 val list = (0..9).map { async { withTimeoutOrNull(1000) { val lOng= Random.nextLong(2000) delay(long) it } } } println(list.map { it.await() }) [0, 1, 2, 3, null, 5, 6, null, null, 9] kotlin 协程可以简单实现... 或者 jdk11 用 CompletableFuture 或者 jdk8 用一下 vertx 的 Promise api... fun main() { println("start ${Date()}") foo() println("end ${Date()}") } var threadPoolExecutor = ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, LinkedBlockingQueue()) fun foo() { val future = (0..9).map { getSomething() } println(future.map { it.result() }) Thread.sleep(1000) println(future.map { it.result() }) } fun getSomething(): Future<String> { val promise = Promise.promise<String>() threadPoolExecutor.execute { Thread.sleep(Random.nextLong(1500)) val result = Random.nextLong(3000).toString() promise.complete(result) } return promise.future() } start Sat Apr 17 17:56:12 CST 2021 [null, null, null, null, null, null, null, null, null, null] [2255, null, 2370, 750, 1399, 2796, null, null, 39, null] end Sat Apr 17 17:56:13 CST 2021 不过这样无法取消任务... |