在返回带取消的IAsyncEnumerable的函数中迭代IAsyncEnumerable

正如标题所述,我必须执行以下功能:

public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
        .AsAsyncEnumerable()
        .WithCancellation(cancellationToken)
        .ConfigureAwait(false))
    {
        yield return job;
    }
}

我无法确定取消令牌的去向,也无法在太多地方使用它。

当您解构所有花哨的异步内容时,这里实际上发生了什么?还有编写此函数的更好方法吗?

businiao555 回答:在返回带取消的IAsyncEnumerable的函数中迭代IAsyncEnumerable

对于初学者来说,该方法可以简化为:

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
{
    return context.Jobs
                  .Where(job => job.Pipeline.Id == pipelineId)
                  .AsAsyncEnumerable();
}

甚至

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
    => context.Jobs
              .Where(job => job.Pipeline.Id == pipelineId)
              .AsAsyncEnumerable();

该方法对job不做任何事情,因此不需要对其进行迭代。

取消

如果该方法实际上使用了job,应该在哪里使用取消令牌呢?

让我们稍微整理一下方法。等效为:

public async IAsyncEnumerable<Job> GetByPipeline(
      int pipelineId,[EnumeratorCancellation] CancellationToken ct = default)
{
    //Just a query,doesn't execute anything
    var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);

    //Executes the query and returns the *results* as soon as they arrive in an async stream
    var jobStream=query.AsAsyncEnumerable();

    //Process the results from the async stream as they arrive
    await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
    {
        //Does *that* need cancelling?
        DoSometingExpensive(job);
    }
}

IQueryable query不执行任何操作,它代表查询。不需要取消。

AsAsyncEnumerable()AsEnumerable()ToList()执行查询并返回一些结果。 ToList()等消耗所有结果,而As...Enumerable()方法仅在需要时才产生结果。该查询无法取消,As_Enumerable()方法除非有要求,否则不会返回任何内容,因此它们不需要取消。

await foreach将遍历整个异步流,因此,如果我们想中止它,我们要做需要传递取消令牌。

最后,DoSometingExpensive(job);是否需要取消?它是否太昂贵了,以至于如果我们花费太长时间,我们希望能够退出它?或者我们可以等到它完成才退出循环吗?如果需要取消,则也需要CancellationToken。

ConfigureAwait

最后,ConfigureAwait(false)不涉及取消,并且可能根本不需要。如果没有它,则每次执行await后都会返回到原始同步上下文。在桌面应用程序中,这意味着UI线程。这就是允许我们在异步事件处理程序中修改UI的原因。

如果GetByPipeline在桌面应用程序上运行并想要修改UI,则必须删除ConfugureAwait

await foreach (var job in jobStream.WithCancellation(ct))
{
        //Update the UI
        toolStripProgressBar.Increment(1);
        toolStripStatusLabel.Text=job.Name;
        //Do the actual job
        DoSometingExpensive(job);
}

使用ConfigureAwait(false),在线程池线程上继续执行,而我们不能触摸UI。

库代码不应影响执行恢复的方式,因此大多数库使用ConfigureAwait(false)并将最终决定权留给UI开发人员。

如果GetByPipeline是一种库方法,请使用ConfigureAwait(false)

,

想象一下,实体框架内部的某个地方是方法GetJobs,它从数据库中检索Job对象:

private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    while (await dataReader.ReadAsync(cancellationToken))
    {
        yield return new Job()
        {
            Id = (int)dataReader["Id"],Data = (byte[])dataReader["Data"]
        };
    }
}

现在想象一下Data属性包含一个巨大的字节数组,其中数据伴随着Job。检索每个Job的数组可能会花费一些不小的时间。在这种情况下,中断两次迭代之间的循环是不够的,因为在调用Cancel方法和提高OperationCanceledException之间会有明显的延迟。这就是方法DbDataReader.ReadAsync需要CancellationToken以便可以立即取消查询的原因。

现在的挑战是,当CancellationToken之类的属性沿途时,如何将客户端代码传递的GetJobs传递给context.Jobs方法。解决方案是WithCancellation扩展方法,该方法存储令牌并将令牌更深地传递给接受用EnumeratorCancellation属性修饰的参数的方法。

因此,对于您而言,您已正确完成了所有操作。建议在您的cancellationToken返回方法中加入一个IAsyncEnumerable自变量。这样,链接到您的WithCancellation方法的后续GetByPipeline不会被浪费。然后,将WithCancellation链接到方法内部的AsAsyncEnumerable之后,这也是正确的。否则,CancellationToken不会到达其最终目的地,即GetJobs方法。

本文链接:https://www.f2er.com/3141272.html

大家都在问