C#等待生产者/消费者中的多个事件

前端之家收集整理的这篇文章主要介绍了C#等待生产者/消费者中的多个事件前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在使用 producer/consumer pattern实现数据链路层.数据链路层有自己的线程和状态机,通过线路传输数据链路协议(以太网,RS-232 ……).物理层的接口表示为System.IO.Stream.另一个线程将消息写入数据链接对象并从中读取消息.

数据链接对象具有空闲状态,必须等待以下四种情况之一:

>收到一个字节
>网络线程提供了一条消息
>保持活动计时器已过期
>所有通信都被网络层取消

我很难找到最好的方法来实现这一点,而无需将通信分成读/写线程(从而大大增加了复杂性).以下是我如何获得4分中的3分:

  1. // Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
  2. stream.ReadTimeout = 10000;
  3. await stream.ReadAsync(buf,1,cts.Token);

要么

  1. BlockingCollection<byte[]> SendQueue = new ...;
  2. ...
  3. // Check for a message from network layer. Timeout after 10 seconds.
  4. // Monitor cancellation token.
  5. SendQueue.TryTake(out msg,10000,cts.Token);

我该怎么做才能阻止线程,等待所有四个条件?欢迎所有建议.我没有设置任何架构或数据结构.

编辑:********感谢大家的帮助.这是我的解决方案********

首先,我认为没有生产者/消费者队列的异步实现.所以我实现了类似于this stackoverflow post的东西.

我需要一个外部和内部取消源来分别停止使用者线程并取消中间任务,similar to this article.

  1. byte[] buf = new byte[1];
  2. using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
  3. {
  4. CancellationToken internalToken = internalTokenSource.Token;
  5. CancellationToken stopToken = stopTokenSource.Token;
  6. using (CancellationTokenSource linkedCts =
  7. CancellationTokenSource.CreateLinkedTokenSource(stopToken,internalToken))
  8. {
  9. CancellationToken ct = linkedCts.Token;
  10. Task<int> readTask = m_stream.ReadAsync(buf,ct);
  11. Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
  12. Task keepAliveTask = Task.Delay(m_keepAliveTime,ct);
  13.  
  14. // Wait for at least one task to complete
  15. await Task.WhenAny(readTask,msgTask,keepAliveTask);
  16.  
  17. // Next cancel the other tasks
  18. internalTokenSource.Cancel();
  19. try {
  20. await Task.WhenAll(readTask,keepAliveTask);
  21. } catch (OperationCanceledException e) {
  22. if (e.CancellationToken == stopToken)
  23. throw;
  24. }
  25.  
  26. if (msgTask.IsCompleted)
  27. // Send the network layer message
  28. else if (readTask.IsCompleted)
  29. // Process the byte from the physical layer
  30. else
  31. Contract.Assert(keepAliveTask.IsCompleted);
  32. // Send a keep alive message
  33. }
  34. }

解决方法

在这种情况下,我只会使用取消令牌取消.像保持活动计时器这样的重复超时更好地表示为计时器.

所以,我会把它建模为三个可取消的任务.首先,取消令牌:

All communication was cancelled by the network layer

  1. CancellationToken token = ...;

然后,三个并发操作:

A byte is received

  1. var readByteTask = stream.ReadAsync(buf,token);

The keep-alive timer has expired

  1. var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10),token);

A message is available from the network thread

这个有点棘手.您当前的代码使用BlockingCollection< T>,它不是异步兼容的.我建议切换到TPL Dataflow’s BufferBlock<T>my own AsyncProducerConsumerQueue<T>,其中任何一个都可以用作异步兼容的生产者/消费者队列(意味着生产者可以是同步或异步,消费者可以是同步或异步).

  1. BufferBlock<byte[]> SendQueue = new ...;
  2. ...
  3. var messageTask = SendQueue.ReceiveAsync(token);

然后,您可以使用Task.WhenAny确定完成了哪些任务:

  1. var completedTask = await Task.WhenAny(readByteTask,keepAliveTimerTask,messageTask);

现在,您可以通过将completedTask与其他人进行比较并等待它们来检索结果:

  1. if (completedTask == readByteTask)
  2. {
  3. // Throw an exception if there was a read error or cancellation.
  4. await readByteTask;
  5. var byte = buf[0];
  6. ...
  7. // Continue reading
  8. readByteTask = stream.ReadAsync(buf,token);
  9. }
  10. else if (completedTask == keepAliveTimerTask)
  11. {
  12. // Throw an exception if there was a cancellation.
  13. await keepAliveTimerTask;
  14. ...
  15. // Restart keepalive timer.
  16. keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10),token);
  17. }
  18. else if (completedTask == messageTask)
  19. {
  20. // Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
  21. byte[] message = await messageTask;
  22. ...
  23. // Continue reading
  24. messageTask = SendQueue.ReceiveAsync(token);
  25. }

猜你在找的C#相关文章