你怎么能在RX中做一个简单的,有状态的序列变换?
假设我们想要对IObservable noisySequence进行指数移动平均变换.
每当noisySequence滴答时,emaSequence应该勾选并返回该值
(prevIoUsEmaSequenceValue *(1-lambda)latestNoisySequenceValue * lambda)
我猜我们使用的是主题,但究竟是怎么回事?
- public static void Main()
- {
- var rand = new Random();
- IObservable<double> sequence = Observable
- .Interval(TimeSpan.FromMilliseconds(1000))
- .Select(value => value + rand.NextDouble());
- Func<double,double> addNoise = x => x + 10*(rand.NextDouble() - 0.5);
- IObservable<double> noisySequence = sequence.Select(addNoise);
- Subject<double> exponentialMovingAverage = new Subject<double>(); // ???
- sequence.Subscribe(value => Console.WriteLine("original sequence "+value));
- noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value));
- exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value));
- Console.ReadLine();
- }
解决方法
这是您可以将状态附加到序列的方法.在这种情况下,它计算最后10个值的平均值.
- var movingAvg = noisySequence.Scan(new List<double>(),(buffer,value)=>
- {
- buffer.Add(value);
- if(buffer.Count>MaxSize)
- {
- buffer.RemoveAt(0);
- }
- return buffer;
- }).Select(buffer=>buffer.Average());
但你可以使用Window(这是一种泛化的方式)来获得你的平均值.
- noisySequence.Window(10)
- .Select(window=>window.Average())
- .SelectMany(averageSequence=>averageSequence);