This article shows some improvements to the original code presented in my previous post. Improvements are based on the feedback I received from Erik Meijer and his team and mainly focus on some built-in opportunities in Rx framework that I missed. Here is an extract from the original implementation of the running low feed:
// Daily low price feed double min = double.MaxValue; var feedLo = feed .Where(p => p < min) .Do(p => min = Math.Min(min, p)) .Select(p => "New LO: " + p);
This implementation depends on the external (to the framework) state (current low value stored in “min” local variable), “Do” combinator to continuously update this state with every price tick, and a “Where” combinator to suppress the ticks that do not represent running low value. Turns out, there is a better, more elegant and framework friendly way to implement this. It relies on the built-in “Scan” and “HoldUntilChanged” combinators:
// Daily low price feed var feedLo = feed .Scan(Math.Min) .HoldUntilChanged() .Select(p => "New LO: " + p);
- “Scan” operator is used to accumulate the running min value. “Scan” is similar to “Aggregate” in the LINQ-to-Objects world but, instead of collapsing the sequence to a single accumulated value, it keeps outputting the current accumulated value with every tick. As a result “Math.Min” function is continuously applied with every new tick with accumulated min value and new price as arguments and its result is “ticked out”. This produces a stream where every successive number is either less than or equal to the previous one. Close to what we need except we want to suppress duplicates.
- “HoldUntilChanged” is another built-in combinator that wraps an underlying stream and only repeats a value from it if this value differs from the previous one. Perfect for removing successive duplicates (please, note, it will NOT remove ALL the duplicates, only successive ones, so it is not a proper substitute to Distinct, when such a combinator exists) . We use “HoldUntilChanged” instead of original “Where” clause.
Here is the full code:
// simulate market data var rnd = new Random(); var feed = Observable.Defer(() => Observable.Return(Math.Round(30.0 + rnd.NextDouble(), 2)) .Delay(TimeSpan.FromSeconds(1 * rnd.NextDouble()))) .Repeat(); // Daily low price feed var feedLo = feed .Scan(Math.Min) .HoldUntilChanged() .Select(p => "New LO: " + p); // Daily high price feed var feedHi = feed .Scan(Math.Max) .HoldUntilChanged() .Select(p => "New HI: " + p); // Combine hi and lo in one feed and subscribe to it feedLo.Merge(feedHi).Subscribe(Console.WriteLine); Console.ReadKey(true);