Throttling High-Frequency Data Feed Using Reactive Extensions for .NET


Reactive Framework Labs

This article is one from the “Processing Financial Data with Microsoft Reactive Framework (Reactive Extensions for .NET)” series of articles that demonstrate some basic applications of Rx framework to financial data. This article shows how to throttle your incoming feed to a desired frequency. Other articles in the series are:

Visual Studio 2008 solution containing full source code for all examples can be downloaded here.

Throttling Implementation

The key operator here is “Sample”. Sampling at a desired rate effectively returns your a throttled stream. My intuition was that if underlying feed does not produce a value during the sampling interval, “Sample” will repeat the most recent available value when sampling is due. But it doesn’t and this makes it a perfect fit for throttling. Note that there is also an operator called “Throttle” but its name is misleading and its behavior most likely will not match your definition of throttling. The way implemented it will not emit any values at all if underlying feed produces faster than throttling interval, so it looks more like “Choke” to me.

Lets look at the following code sample:

// simulate very fast (no delays) market data feed
var rnd = new Random();
var unthrottledFeed = Observable.Defer(() =>
    Observable.Return(30.0)
    .Select(p => Math.Round(p + rnd.NextDouble(), 2))
    .Repeat());

// running unthottled feed as a demo
ManualResetEvent unthrottledDone = new ManualResetEvent(false);
int count = 0;
unthrottledFeed
    .Do(p => ++count)
    .Until(Observable.Return("Done unthrottled feed")
    .Delay(TimeSpan.FromSeconds(5)))
    .Subscribe(Console.WriteLine, () => unthrottledDone.Set());
unthrottledDone.WaitOne();
Console.WriteLine(
    "\nDone unthrottled feed. Avg rate: {0} values per second",
    count / 5);

// running same feed, now throttled
Console.WriteLine("Now throttlig at 1 value per second...\n");
var throttledFeed = unthrottledFeed.Sample(TimeSpan.FromSeconds(1));
throttledFeed.Subscribe(Console.WriteLine);

Console.ReadKey(true);

The first code block is a simple fast feed simulator. It produces the values as fast as you CPU allows, or about 3,000 per second (with printing to the console) on my laptop. Most UI data binding engines will not be happy to accommodate this rate.

Second code block runs non-throttled feed for about 5 seconds just so you can get a feel that frequency is pretty high.

Third code block wraps non-throttled feed with a “Sample” operator with one second sampling interval. As discussed above current implementation of “Sample” is effectively throttling. When you subscribe to the throttled feed values are emitted with frequency of one value per second, with any values in-between dropped.

6 comments
  1. sergun said:

    Nice post!

    And how more complex simulator implemented via RX?

    With support of order registration and order cancellation methods and order state change notifications?

  2. Ashish Singh said:

    Not working with the new Rx SDK as the first subscription blocking the whole application. Can any one have look into this?

Leave a comment