Detecting Running High/Low Prices Using Reactive Framework – Follow-up

This article shows some improvements to the original code presented in my previous post. Improvements are based on the feedback I received from Erik Meijer and his team and mainly focus on some built-in opportunities in Rx framework that I missed. Here is an extract from the original implementation of the running low feed:

// Daily low price feed
double min = double.MaxValue;
var feedLo = feed
    .Where(p => p < min)
    .Do(p => min = Math.Min(min, p))
    .Select(p => "New LO: " + p);

This implementation depends on the external (to the framework) state (current low value stored in “min” local variable), “Do” combinator to continuously update this state with every price tick, and a “Where” combinator to suppress the ticks that do not represent running low value. Turns out, there is a better, more elegant and framework friendly way to implement this. It relies on the built-in “Scan” and “HoldUntilChanged” combinators:

// Daily low price feed
var feedLo = feed
    .Select(p => "New LO: " + p);

The code above is functionally equivalent to the original one but is cleaner, more compact, and easier to read. This is how it works:

  • “Scan” operator is used to accumulate the running min value. “Scan” is similar to “Aggregate” in the LINQ-to-Objects world but, instead of collapsing the sequence to a single accumulated value, it keeps outputting the current accumulated value with every tick. As a result “Math.Min” function is continuously applied with every new tick with accumulated min value and new price as arguments and its result is “ticked out”. This produces a stream where every successive number is either less than or equal to the previous one. Close to what we need except we want to suppress duplicates.
  • “HoldUntilChanged” is another built-in combinator that wraps an underlying stream and only repeats a value from it if this value differs from the previous one. Perfect for removing successive duplicates (please, note, it will NOT remove ALL the duplicates, only successive ones, so it is not a proper substitute to Distinct, when such a combinator exists) . We use “HoldUntilChanged” instead of original “Where” clause.

Here is the full code:

// simulate market data
var rnd = new Random();
var feed = Observable.Defer(() =>
    Observable.Return(Math.Round(30.0 + rnd.NextDouble(), 2))
    .Delay(TimeSpan.FromSeconds(1 * rnd.NextDouble())))

// Daily low price feed
var feedLo = feed
    .Select(p => "New LO: " + p);

// Daily high price feed
var feedHi = feed
    .Select(p => "New HI: " + p);

// Combine hi and lo in one feed and subscribe to it


Happy holidays!


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: