很遗憾,我可以not find the sourcecode。并称Documentation sparse为轻描淡写。所以我最多只能告诉您“如果这是我的课程,我会怎么做”。
在内存中具有多个通道(尤其是无边界)的最大问题将是导致早期OOM的内存碎片。确实,即使没有一个无限的范围,一个大问题也将是必须扩大收藏范围。 List<T>
只不过是T[]
周围具有自动增长支持的包装器。列表无限制的另一个问题是您早晚run out of indexes。
我该如何解决? Linked List。在所有情况下,大约90%的情况下,链表都是我什至会考虑的 last 集合。其余的10%是Queues和Queue like结构。并且频道非常非常像一个队列。在这10%的情况下,在9%的情况下,我只会使用Queue实现所做的任何事情。这是剩余的1%。
对于随机访问,链接列表是最糟糕的集合。对于队列,这是可行的。但是要避免在.NET中与碎片相关的OOM?为了最小化增长成本?为了绕过硬阵列限制?在那里,链接列表是绝对无与伦比的。
如果不这样做?可以制作自己的频道版本来执行 并替换它。
,
Channel<T>
只是线程安全的异步队列。它本身不执行任何处理,只是一个被动的内存FIFO存储。您可以根据需要拥有任意数量的
。
您可以利用Channel
分别公开Reader
和Writer
的事实,将类的客户端的访问权限限制为所需的最低功能。换句话说,除了公开Channel<T>
类型的属性,您还可以考虑公开ChannelWriter<T>
或ChannelReader<T>
类型的属性。
创建无限制的渠道时也应谨慎。单个滥用的频道可能会使您的应用很容易成为OutOfMemoryException
的受害者。
公开ChannelReader<T>
类型的属性的另一种方法是公开IAsyncEnumerable<T>
s。
,
只要正确使用它们,就可以随意使用。实际上,使用暴露处理管道的后台服务(本质上是单例)是在.NET Core中使用它们的一种非常常见的方法。
通道不仅是异步队列。它们类似于DataFlow块-它们可用于创建处理管道,每个块/工作人员处理来自输入缓冲区/ ChannelReader的数据,并将结果转发到输出缓冲区/ ChannelWriter。 DataFlow块通过任务本身处理异步处理。通过渠道,我们需要自己处理工人的任务。
我们需要牢记的一个非常重要的概念是没有直接访问频道。实际上,在几乎所有情况下,它们甚至都不应公开为字段或属性。在大多数情况下,只需要一个ChannelReader。在某些情况下,例如在管道的顶部,可能会暴露ChannelWriter 。或不。
个人工作者/步骤
典型的工作步骤如下所示
private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyOtherType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var item from reader.ReadAllAsync(token))
{
MyType2 result=........;
await writer.WriteAsync(result);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;
}
一些注意事项:
- 您可以根据需要创建多个任务,并使用
Task.WhenAll
等待所有工作人员完成,然后再关闭频道。
- 如果管道不够快,您可以使用 bounded 通道来防止大量消息堆积。
- 如果发出取消信号,则从输入通道和读取的信息都会被取消。
- 工作者任务完成时,无论是由于取消还是引发,都将关闭该通道。
- “头”频道完成后,完成将从一个步骤进行到下一个步骤。
组合步骤
可以通过将一个人的输出阅读器传递给另一个人的输入阅读器来组合多个步骤,例如:
var cts=new CancelaltionTokenSource();
var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;
CancellationTokenSource可用于提前结束管道或设置超时,以防挂起管道。
管道头
“头部”阅读器可能来自“适配器”方法,例如:
private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
foreach(var item from input)
{
if (token.IsCancellationRequested)
{
break;
}
writer.TryWrite(result);
}
//No-one else is going to complete this channel
channel.Complete();
return channel.Reader;
}
对于后台服务,我们可以使用服务方法将输入“发布”到头通道,例如:
class MyService
{
Channel<MyType0> _headChannel;
public MyService()
{
_headChannel=Channel.CreateBounded<MyType0>(5);
}
public async Task ExecuteAsync(CancellationToken token)
{
var step1=Step1(_headChannel.Reader,token);
var step2=Step2(step1,token);
await step2.Completion;
}
public Task PostAsync(MyType0 input)
{
return _headChannel.Writer.WriteAsync(input);
}
public Stop()
{
_headChannel.Writer.TryComplete();
}
...
}
我故意使用看起来像BackgroundService method names的方法名称。 StartAsync或ExecuteAsync可用于设置管道。 StopAsync可用于指示其完成,例如,当最终用户点击 Ctrl + C 时。
queued BackgroundService示例中显示的另一种有用的技术是注册一个接口,客户端可以使用该接口来发布消息,而不是直接访问服务类,例如:
interface IQueuedService<T>
{
Task PostAsync(T input);
}
与System.Linq.Async结合
ReadAllAsync()
方法返回一个IAsyncEnumerable<T>
,这意味着我们可以在System.Linq.Async中使用运算符,例如Where或Take来过滤,批处理或转换消息,例如:
private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var inpStream=reader.ReadAllAsync(token)
.Where(it=>it.IsActive);
await foreach(var item from inpStream)
{
await writer.WriteAsync(item);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;
}
本文链接:https://www.f2er.com/3121806.html