Archive

Monthly Archives: October 2010

One of my colleges (http://weblogs.asp.net/sweinstein/) brought up an interesting problem: there is a common theme in finance applications to subscribe to a market data source to receive real time updates on the instruments that this source covers. A late subscriber (the one that has not been subscribed since the start of the trading day, for example) usually expects to receive a snapshot per instrument before real time updates. So what is the best way to support snapshot guarantee, per instrument, from a “hot” underlying source using Rx? There are multiple ways to do it, of course, and here is my version of it:

public static IObservable<T> WithCachedSnapshot<T, TKey>(
    this IObservable<T> source,
    Func<T, TKey> keySelector)
{
    var cache = new Dictionary<TKey, T>();
    return Observable.Defer(() => {
            lock (cache)
                return cache.Values.ToList().ToObservable();
        })
        .Concat(source.Do(s => {
            lock (cache) cache[keySelector(s)] = s;
        }));
}

If you can’t take my word for it, here is the proof that it works:

[TestMethod()]
public void CacheByKeyTestHelper()
{
    // Arrange
    TestScheduler scheduler = new TestScheduler();
    var source = scheduler.CreateHotObservable(
        OnNext(100, new { Ticker = "MSFT", Price = 30.5 }),
        OnNext(101, new { Ticker = "IBM", Price = 130.2 }),
        OnNext(102, new { Ticker = "MSFT", Price = 31.5 }),
        OnNext(103, new { Ticker = "IBM", Price = 130.1 }),
        OnNext(104, new { Ticker = "AAPL", Price = 150.5 }));

    // Act
    var target = RxExtensions.WithCachedSnapshot(
        source,
        quote => quote.Ticker);
    var results = scheduler.Run(() => target, 0, 0, 105);
    var cachedResults = scheduler.Run(() => target, 0, 0, 200);

    // Assert
    results.AssertEqual(
        OnNext(100, new { Ticker = "MSFT", Price = 30.5 }),
        OnNext(101, new { Ticker = "IBM", Price = 130.2 }),
        OnNext(102, new { Ticker = "MSFT", Price = 31.5 }),
        OnNext(103, new { Ticker = "IBM", Price = 130.1 }),
        OnNext(104, new { Ticker = "AAPL", Price = 150.5 }));

    cachedResults.AssertEqual(
        OnNext(105, new { Ticker = "MSFT", Price = 31.5 }),
        OnNext(105, new { Ticker = "IBM", Price = 130.1 }),
        OnNext(105, new { Ticker = "AAPL", Price = 150.5 }));
}

private Recorded<Notification<T>> OnNext<T>(
    long ticks, T value)
{
    return new Recorded<Notification<T>>(
        ticks,
        new Notification<T>.OnNext(value));
}