有什么方法可以创建异步流生成器,从而产生重复调用函数的结果?

我想构建一个收集天气更新并将其表示为流的程序。我想无限循环地调用get_weather(),在完成开始之间有60秒的延迟。

简化版本如下:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}

有什么方法可以轻松地做到这一点吗?

tokio::timer::Interval花费60秒以上时,无法使用get_weather()

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}

如果发生这种情况,下一个功能将立即启动。我想在上一个get_weather()开始和下一个get_weather()开始之间保持60秒的时间。

solinde 回答:有什么方法可以创建异步流生成器,从而产生重复调用函数的结果?

使用stream::unfold从“期货世界”转到“信息流世界”。我们不需要任何额外的状态,因此我们使用空元组:

use std::time::Duration;
use tokio::timer;

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((),|_| {
        async {
            timer::delay_for(BETWEEN).await;
            let weather = get_weather().await;
            Some((weather,()))
        }
    })
}

use futures::StreamExt;

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| {
            async {
                println!("Got the weather");
            }
        })
        .await;
}
% time ./target/debug/example

Got the weather
Got the weather
Got the weather

real    3.013   3013370us
user    0.004   3763us
sys     0.003   2604us
[dependencies]
futures-preview = "0.3.0-alpha.19"
tokio = "0.2.0-alpha.6"

经过rustc 1.39.0-beta.7测试

另请参阅:

本文链接:https://www.f2er.com/3163875.html

大家都在问