我正在使用SemaphoreSlim
with a FIFO behaviour,现在我想向其中添加一个Starve(int amount)
方法以从池中删除线程,这与Release()
相反
如果有任何正在运行的任务,它们当然会一直持续到完成为止,因为此刻信号灯无法跟踪实际运行的内容,并且“欠”该信号灯一个释放调用。
原因是用户将随时动态控制给定信号量所允许的进程数。
我遵循的策略是:
- 如果有可用线程,即
CurrentCount > 0
,则在不释放回来的情况下在SemaphoreSlim上调用Await()
。 - 如果没有更多的线程可用,因为可能正在运行任务,甚至可能正在排队,那么下次调用
Release()
时,请忽略它以防止线程被释放(一个int变量保持计数)
我已经添加了下面到目前为止的代码。我一直在努力的主要问题是如何确保线程安全,没有死锁和没有令人惊讶的竞争条件。
鉴于我无法访问信号量的private lock(),我创建了一个新对象,以至少尝试防止多个线程同时(在包装器内)操纵新变量。
但是,我担心SemaphoreSlim中的其他变量(例如CurrentCount
)也可能会改变一半,并使事情变得混乱……我希望Release()
方法中的锁可以防止更改到CurrentCount
,但也许我还应该将锁应用于Wait和WaitAsync(这也可能会更改CurrentCount)?这也可能导致两次调用Wait(?)的不必要的锁定
在这种情况下,对semaphore.Wait()
的呼叫比await semaphore.WaitAsync()
好还是差?
是否有更好的方法来扩展诸如SemaphoreSlim之类的功能,该类包含许多可能需要的私有变量或对访问有用的私有变量?
我简要地考虑过创建一个从SemaphoreSlim继承的新类,或者研究扩展方法,也许使用反射来访问私有变量,但是似乎没有一个明显的或有效的。
public class SemaphoreQueue
{
private SemaphoreSlim semaphore;
private ConcurrentQueue<TaskCompletionSource<bool>> queue = new ConcurrentQueue<TaskCompletionSource<bool>>();
private int releasesToIgnore;
private object lockObj;
private const int NO_MAXIMUM = Int32.MaxValue; // cannot access SemaphoreSlim.NO_MAXIMUM
public SemaphoreQueue(int initialCount) : this(initialCount,NO_MAXIMUM) { }
public SemaphoreQueue(int initialCount,int maxCount)
{
semaphore = new SemaphoreSlim(initialCount,maxCount);
lockObj = new object();
releasesToIgnore = 0;
}
public void Starve(int amount)
{
lock (lockObj)
{
// a maximum of CurrentCount threads can be immediatelly starved by calling Wait without release
while ((semaphore.CurrentCount > 0) && (amount > 0))
{
semaphore.Wait();
amount -= 1;
}
// presumably there are still tasks running. The next Releases will be ignored.
if (amount > 0)
releasesToIgnore += amount;
}
}
public int Release()
{
return Release(1);
}
public int Release(int num)
{
lock (lockObj)
{
if (releasesToIgnore > num)
{
releasesToIgnore -= num;
return semaphore.CurrentCount;
}
else
{
int oldReleasesToIgnore = releasesToIgnore;
releasesToIgnore = 0;
return semaphore.Release(num - oldReleasesToIgnore);
}
}
}
public void Wait(CancellationToken token)
{
WaitAsync(token).Wait();
}
public Task WaitAsync(CancellationToken token)
{
var tcs = new TaskCompletionSource<bool>();
queue.Enqueue(tcs);
QueuedAwait(token);
return tcs.Task;
}
public int CurrentCount { get => this.semaphore.CurrentCount; }
private void QueuedAwait(CancellationToken token)
{
semaphore.WaitAsync(token).ContinueWith(t =>
{
TaskCompletionSource<bool> popped;
if (queue.TryDequeue(out popped))
popped.SetResult(true);
});
}
public void Dispose()
{
semaphore.Dispose();
}
}