为SemaphoreSlim实现Starve方法(“ Unrelease” /“ Hold”)

我正在使用SemaphoreSlim with a FIFO behaviour,现在我想向其中添加一个Starve(int amount)方法以从池中删除线程,这与Release()相反

如果有任何正在运行的任务,它们当然会一直持续到完成为止,因为此刻信号灯无法跟踪实际运行的内容,并且“欠”​​该信号灯一个释放调用。

原因是用户将随时动态控制给定信号量所允许的进程数。

我遵循的策略是:

  • 如果有可用线程,即CurrentCount > 0,则在不释放回来的情况下在SemaphoreSlim上调用Await()
  • 如果没有更多的线程可用,因为可能正在运行任务,甚至可能正在排队,那么下次调用Release()时,请忽略它以防止线程被释放(一个int变量保持计数)

我已经添加了下面到目前为止的代码。我一直在努力的主要问题是如何确保线程安全,没有死锁和没有令人惊讶的竞争条件。

鉴于我无法访问信号量的private lock(),我创建了一个新对象,以至少尝试防止多个线程同时(在包装器内)操纵新变量。

但是,我担心SemaphoreSlim中的其他变量(例如CurrentCount)也可能会改变一半,并使事情变得混乱……我希望Release()方法中的锁可以防止更改到CurrentCount,但也许我还应该将锁应用于Wait和WaitAsync(这也可能会更改CurrentCount)?这也可能导致两次调用Wait(?)的不必要的锁定

在这种情况下,对semaphore.Wait()的呼叫比await semaphore.WaitAsync()好还是差?

是否有更好的方法来扩展诸如SemaphoreSlim之类的功能,该类包含许多可能需要的私有变量或对访问有用的私有变量?

我简要地考虑过创建一个从SemaphoreSlim继承的新类,或者研究扩展方法,也许使用反射来访问私有变量,但是似乎没有一个明显的或有效的。

    public class SemaphoreQueue
    {
        private SemaphoreSlim semaphore;
        private ConcurrentQueue<TaskCompletionSource<bool>> queue = new ConcurrentQueue<TaskCompletionSource<bool>>();

        private int releasesToIgnore;
        private object lockObj;
        private const int NO_MAXIMUM = Int32.MaxValue; // cannot access SemaphoreSlim.NO_MAXIMUM
        public SemaphoreQueue(int initialCount) : this(initialCount,NO_MAXIMUM) { }
        public SemaphoreQueue(int initialCount,int maxCount)
        {
            semaphore = new SemaphoreSlim(initialCount,maxCount);
            lockObj = new object();
            releasesToIgnore = 0;
        }
        public void Starve(int amount)
        {
            lock (lockObj)
            {
                // a maximum of CurrentCount threads can be immediatelly starved by calling Wait without release
                while ((semaphore.CurrentCount > 0) && (amount > 0))
                {
                    semaphore.Wait();
                    amount -= 1;
                }
                // presumably there are still tasks running. The next Releases will be ignored.
                if (amount > 0)
                    releasesToIgnore += amount;
            }
        }
        public int Release()
        {
            return Release(1);
        }
        public int Release(int num)
        {
            lock (lockObj)
            {
                if (releasesToIgnore > num)
                {
                    releasesToIgnore -= num;
                    return semaphore.CurrentCount;
                }
                else
                {
                    int oldReleasesToIgnore = releasesToIgnore;
                    releasesToIgnore = 0;
                    return semaphore.Release(num - oldReleasesToIgnore);
                }
            }
        }
        public void Wait(CancellationToken token)
        {
            WaitAsync(token).Wait();
        }
        public Task WaitAsync(CancellationToken token)
        {
            var tcs = new TaskCompletionSource<bool>();
            queue.Enqueue(tcs);
            QueuedAwait(token);
            return tcs.Task;
        }
        public int CurrentCount { get => this.semaphore.CurrentCount; }
        private void QueuedAwait(CancellationToken token)
        {
            semaphore.WaitAsync(token).ContinueWith(t =>
            {
                TaskCompletionSource<bool> popped;
                if (queue.TryDequeue(out popped))
                    popped.SetResult(true);
            });
        }

        public void Dispose()
        {
            semaphore.Dispose();
        }
    }
erma001 回答:为SemaphoreSlim实现Starve方法(“ Unrelease” /“ Hold”)

我认为在SemaphoreSlim类的顶部实现自定义信号是有问题的,因为我们无法访问内置实现所使用的同步原语。因此,我建议仅使用TaskCompletionSource对象队列来实现它。以下是缺少功能的基本实现。 WaitAsync方法缺少取消功能,而Release方法也缺少releaseCount参数。

为简单起见,不使用releasesToIgnore计数器,而是允许现有的currentCount具有负值。 Starve方法只会减少此计数器。

public class SemaphoreFifo
{
    private readonly Queue<TaskCompletionSource<bool>> _queue
        = new Queue<TaskCompletionSource<bool>>();
    private readonly object _locker = new object();
    private readonly int _maxCount;
    private int _currentCount;

    public SemaphoreFifo(int initialCount,int maxCount)
    {
        _currentCount = initialCount;
        _maxCount = maxCount;
    }
    public SemaphoreFifo(int initialCount) : this(initialCount,Int32.MaxValue) { }

    public int CurrentCount { get { lock (_locker) return _currentCount; } }

    public async Task WaitAsync()
    {
        TaskCompletionSource<bool> tcs;
        lock (_locker)
        {
            if (_currentCount > 0)
            {
                _currentCount--;
                return;
            }
            tcs = new TaskCompletionSource<bool>();
            _queue.Enqueue(tcs);
        }
        await tcs.Task;
    }

    public void Starve(int starveCount)
    {
        lock (_locker) _currentCount -= starveCount;
    }

    public void Release()
    {
        TaskCompletionSource<bool> tcs;
        lock (_locker)
        {
            if (_currentCount < 0)
            {
                _currentCount++;
                return;
            }
            if (_queue.Count == 0)
            {
                if (_currentCount >= _maxCount) throw new SemaphoreFullException();
                _currentCount++;
                return;
            }
            tcs = _queue.Dequeue();
        }
        tcs.SetResult(true);
    }
}
本文链接:https://www.f2er.com/2883205.html

大家都在问