数据链接对象具有空闲状态,必须等待以下四种情况之一:
>收到一个字节
>网络线程提供了一条消息
>保持活动计时器已过期
>所有通信都被网络层取消
我很难找到最好的方法来实现这一点,而无需将通信分成读/写线程(从而大大增加了复杂性).以下是我如何获得4分中的3分:
- // Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
- stream.ReadTimeout = 10000;
- await stream.ReadAsync(buf,1,cts.Token);
要么
- BlockingCollection<byte[]> SendQueue = new ...;
- ...
- // Check for a message from network layer. Timeout after 10 seconds.
- // Monitor cancellation token.
- SendQueue.TryTake(out msg,10000,cts.Token);
我该怎么做才能阻止线程,等待所有四个条件?欢迎所有建议.我没有设置任何架构或数据结构.
编辑:********感谢大家的帮助.这是我的解决方案********
首先,我认为没有生产者/消费者队列的异步实现.所以我实现了类似于this stackoverflow post的东西.
我需要一个外部和内部取消源来分别停止使用者线程并取消中间任务,similar to this article.
- byte[] buf = new byte[1];
- using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
- {
- CancellationToken internalToken = internalTokenSource.Token;
- CancellationToken stopToken = stopTokenSource.Token;
- using (CancellationTokenSource linkedCts =
- CancellationTokenSource.CreateLinkedTokenSource(stopToken,internalToken))
- {
- CancellationToken ct = linkedCts.Token;
- Task<int> readTask = m_stream.ReadAsync(buf,ct);
- Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
- Task keepAliveTask = Task.Delay(m_keepAliveTime,ct);
- // Wait for at least one task to complete
- await Task.WhenAny(readTask,msgTask,keepAliveTask);
- // Next cancel the other tasks
- internalTokenSource.Cancel();
- try {
- await Task.WhenAll(readTask,keepAliveTask);
- } catch (OperationCanceledException e) {
- if (e.CancellationToken == stopToken)
- throw;
- }
- if (msgTask.IsCompleted)
- // Send the network layer message
- else if (readTask.IsCompleted)
- // Process the byte from the physical layer
- else
- Contract.Assert(keepAliveTask.IsCompleted);
- // Send a keep alive message
- }
- }
解决方法
所以,我会把它建模为三个可取消的任务.首先,取消令牌:
All communication was cancelled by the network layer
- CancellationToken token = ...;
然后,三个并发操作:
A byte is received
- var readByteTask = stream.ReadAsync(buf,token);
The keep-alive timer has expired
- var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10),token);
A message is available from the network thread
这个有点棘手.您当前的代码使用BlockingCollection< T>,它不是异步兼容的.我建议切换到TPL Dataflow’s BufferBlock<T>
或my own AsyncProducerConsumerQueue<T>
,其中任何一个都可以用作异步兼容的生产者/消费者队列(意味着生产者可以是同步或异步,消费者可以是同步或异步).
- BufferBlock<byte[]> SendQueue = new ...;
- ...
- var messageTask = SendQueue.ReceiveAsync(token);
然后,您可以使用Task.WhenAny确定完成了哪些任务:
- var completedTask = await Task.WhenAny(readByteTask,keepAliveTimerTask,messageTask);
现在,您可以通过将completedTask与其他人进行比较并等待它们来检索结果:
- if (completedTask == readByteTask)
- {
- // Throw an exception if there was a read error or cancellation.
- await readByteTask;
- var byte = buf[0];
- ...
- // Continue reading
- readByteTask = stream.ReadAsync(buf,token);
- }
- else if (completedTask == keepAliveTimerTask)
- {
- // Throw an exception if there was a cancellation.
- await keepAliveTimerTask;
- ...
- // Restart keepalive timer.
- keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10),token);
- }
- else if (completedTask == messageTask)
- {
- // Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
- byte[] message = await messageTask;
- ...
- // Continue reading
- messageTask = SendQueue.ReceiveAsync(token);
- }