是否可以和/或建议在一个对象中使用多个System.Threading.Channels?

我正在开发.net core 3.0 Web应用程序,因此决定在单例服务中使用System.Threading.Channels。我的作用域请求服务的顶层将注入此单例以访问其通道。

我决定使用这种模式将请求(为其他连接的客户端生成实时更新)与这些更新的执行分离。

在一个对象内实现一个通道有许多示例。

有人可以告诉我是否可能/建议在我的单身人士中使用多个渠道吗?

创建单例时,创建多个通道并“启动”它们还没有遇到任何问题。我只是还没到什么程度,我可以与多个客户进行测试,要求他们在单例上点击不同的渠道以查看是否运作良好。 (或完全没有?...)

使用多个渠道的主要动机是我希望单身人士根据渠道中项目的类型来做不同的事情。

public class MyChannelSingleton 
{
    public Channel<MyType> TypeoneChannel = Channel.CreateUnbounded<MyType>();
    public Channel<myotherType> TypeTwoChannel = Channel.CreateUnbounded<myotherType>();

    public MyChannelSingleton() 
    {
        StartChannels();
    }

    private void StartChannels() 
    {
        // discarded async tasks due to calling in ctor
        _ = StartTypeoneChannel();
        _ = StartTypeTwoChannel();
    }

    private async Task StartTypeoneChannel()
    {
        var reader = TypeoneChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyType item))
            {
                // item is sucessfully read from channel
            }
        }
    }

    private async Task StartTypeTwoChannel()
    {
        var reader = TypeTwoChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out myotherType item))
            {
                // item is sucessfully read from channel
            }
        }
    }
}

我还希望永远不要“完成”频道,并在应用程序的整个生命周期内保持可用。

michelleyu7 回答:是否可以和/或建议在一个对象中使用多个System.Threading.Channels?

很遗憾,我可以not find the sourcecode。并称Documentation sparse为轻描淡写。所以我最多只能告诉您“如果这是我的课程,我会怎么做”。

在内存中具有多个通道(尤其是无边界)的最大问题将是导致早期OOM的内存碎片。确实,即使没有一个无限的范围,一个大问题也将是必须扩大收藏范围。 List<T>只不过是T[]周围具有自动增长支持的包装器。列表无限制的另一个问题是您早晚run out of indexes

我该如何解决? Linked List。在所有情况下,大约90%的情况下,链表都是我什至会考虑的 last 集合。其余的10%是Queues和Queue like结构。并且频道非常非常像一个队列。在这10%的情况下,在9%的情况下,我只会使用Queue实现所做的任何事情。这是剩余的1%。

对于随机访问,链接列表是最糟糕的集合。对于队列,这是可行的。但是要避免在.NET中与碎片相关的OOM?为了最小化增长成本?为了绕过硬阵列限制?在那里,链接列表是绝对无与伦比的

如果不这样做?可以制作自己的频道版本来执行 并替换它。

,

Channel<T>只是线程安全的异步队列。它本身不执行任何处理,只是一个被动的内存FIFO存储。您可以根据需要拥有任意数量的

您可以利用Channel分别公开ReaderWriter的事实,将类的客户端的访问权限限制为所需的最低功能。换句话说,除了公开Channel<T>类型的属性,您还可以考虑公开ChannelWriter<T>ChannelReader<T>类型的属性。

创建无限制的渠道时也应谨慎。单个滥用的频道可能会使您的应用很容易成为OutOfMemoryException的受害者。

公开ChannelReader<T>类型的属性的另一种方法是公开IAsyncEnumerable<T> s。

,

只要正确使用它们,就可以随意使用。实际上,使用暴露处理管道的后台服务(本质上是单例)是在.NET Core中使用它们的一种非常常见的方法。

通道不仅是异步队列。它们类似于DataFlow块-它们可用于创建处理管道,每个块/工作人员处理来自输入缓冲区/ ChannelReader的数据,并将结果转发到输出缓冲区/ ChannelWriter。 DataFlow块通过任务本身处理异步处理。通过渠道,我们需要自己处理工人的任务。

我们需要牢记的一个非常重要的概念是没有直接访问频道。实际上,在几乎所有情况下,它们甚至都不应公开为字段或属性。在大多数情况下,只需要一个ChannelReader。在某些情况下,例如在管道的顶部,可能会暴露ChannelWriter 。或不。

个人工作者/步骤

典型的工作步骤如下所示

private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyOtherType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        await foreach(var item from reader.ReadAllAsync(token))
        {
             MyType2 result=........;
             await writer.WriteAsync(result);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}

一些注意事项:

  • 您可以根据需要创建多个任务,并使用Task.WhenAll等待所有工作人员完成,然后再关闭频道。
  • 如果管道不够快,您可以使用 bounded 通道来防止大量消息堆积。
  • 如果发出取消信号,则从输入通道读取的信息都会被取消。
  • 工作者任务完成时,无论是由于取消还是引发,都将关闭该通道。
  • “头”频道完成后,完成将从一个步骤进行到下一个步骤。

组合步骤

可以通过将一个人的输出阅读器传递给另一个人的输入阅读器来组合多个步骤,例如:

var cts=new CancelaltionTokenSource();

var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;

CancellationTokenSource可用于提前结束管道或设置超时,以防挂起管道。

管道头

“头部”阅读器可能来自“适配器”方法,例如:

private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    foreach(var item from input)
    {
        if (token.IsCancellationRequested)
        {
            break;
        }
        writer.TryWrite(result);
    }
    //No-one else is going to complete this channel
    channel.Complete();
    return channel.Reader;    
}

对于后台服务,我们可以使用服务方法将输入“发布”到头通道,例如:

class MyService
{
    Channel<MyType0> _headChannel;

    public MyService()
    {
        _headChannel=Channel.CreateBounded<MyType0>(5);
    }

    public async Task ExecuteAsync(CancellationToken token)
    {
        var step1=Step1(_headChannel.Reader,token);
        var step2=Step2(step1,token);        
        await step2.Completion;
    }

    public Task PostAsync(MyType0 input)
    {
        return _headChannel.Writer.WriteAsync(input);
    }

    public Stop()
    {
        _headChannel.Writer.TryComplete();
    }

...

}

我故意使用看起来像BackgroundService method names的方法名称。 StartAsync或ExecuteAsync可用于设置管道。 StopAsync可用于指示其完成,例如,当最终用户点击 Ctrl + C 时。

queued BackgroundService示例中显示的另一种有用的技术是注册一个接口,客户端可以使用该接口来发布消息,而不是直接访问服务类,例如:

interface IQueuedService<T>
{
    Task PostAsync(T input);
}

与System.Linq.Async结合

ReadAllAsync()方法返回一个IAsyncEnumerable<T>,这意味着我们可以在System.Linq.Async中使用运算符,例如Where或Take来过滤,批处理或转换消息,例如:

private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        var inpStream=reader.ReadAllAsync(token)
                            .Where(it=>it.IsActive);
        await foreach(var item from inpStream)
        {
             await writer.WriteAsync(item);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}
本文链接:https://www.f2er.com/3121806.html

大家都在问