考虑以下代码

extern crate futures; // v0.1 (old)

use std::sync::{atomic, Arc};
use futures::*;

struct F(Arc<atomic::AtomicBool>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        if self.0.load(atomic::Ordering::Relaxed) {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let flag = Arc::new(atomic::AtomicBool::new(false));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        flag.store(true, atomic::Ordering::Relaxed);
    });
    // ::std::thread::sleep_ms(20);
    let result = future.wait();
    println!("result: {:?}", result);
}

产生的线程设置一个标志,将来等待。
我们还休眠产生的线程,因此从.poll()进行的初始.wait()调用是在设置标志之前。这导致.wait()无限期地(似乎)阻塞。如果我们取消注释其他thread::sleep_ms,则.wait()返回,并打印出结果(())。

我希望当前线程尝试通过多次调用poll来解决 future 问题,因为我们阻塞了当前线程。但是,这没有发生。

我尝试读取some docs,似乎问题是第一次从park获得NotReady后,该线程被poll ed了。但是,我不清楚为什么会这样,或者如何解决这个问题。

我想念什么?

最佳答案

为什么您需要停放一个等待的将来而不是反复轮询呢?答案很明显,恕我直言。因为归根结底,它更快,更高效!

要反复查询一个 future (可能被称为“忙碌等待”),图书馆将不得不决定是经常还是很少这样做,而答案都不能令人满意。经常执行此操作会浪费CPU周期,很少执行,并且代码响应速度很慢。

所以,是的,您需要在等待某事时将其停放,然后在等待完成后将其取消停放。像这样:

#![allow(deprecated)]

extern crate futures;

use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};

struct Status {
    ready: bool,
    task: Option<Task>,
}

#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        let mut status = self.0.lock().expect("!lock");
        if status.ready {
            Ok(Async::Ready(()))
        } else {
            status.task = Some(park());
            Ok(Async::NotReady)
        }
    }
}

#[test]
fn test() {
    let flag = Arc::new(Mutex::new(Status {
                                       ready: false,
                                       task: None,
                                   }));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        let mut status = flag.lock().expect("!lock");
        status.ready = true;
        if let Some(ref task) = status.task {
            task.unpark()
        }
    });
    let result = future.wait();
    println!("result: {:?}", result);
}

请注意,Future::poll在这里正在做几件事:它正在检查外部条件并且正在暂挂任务,因此有可能进行比赛,例如:
  • poll检查变量,发现它是false
  • 外部代码将变量设置为true
  • 外部代码检查任务是否已停放,并发现任务没有停放;
  • poll可以暂挂任务,但是热闹非凡!为时已晚,没有人会再将其停放。

  • 为了避免任何竞赛,我使用了Mutex来同步这些交互。

    P.S.如果您只需要将线程结果包装到Future中,则考虑使用 oneshot channel :它具有已经实现Receiver接口(interface)的 Future

    关于multithreading - 为什么返回 `Future::poll`后不反复调用 `NotReady`?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43503953/

    10-12 00:31