One of my colleges (http://weblogs.asp.net/sweinstein/) brought up an interesting problem: there is a common theme in finance applications to subscribe to a market data source to receive real time updates on the instruments that this source covers. A late subscriber (the one that has not been subscribed since the start of the trading day, for example) usually expects to receive a snapshot per instrument before real time updates. So what is the best way to support snapshot guarantee, per instrument, from a “hot” underlying source using Rx? There are multiple ways to do it, of course, and here is my version of it:
public static IObservable<T> WithCachedSnapshot<T, TKey>( this IObservable<T> source, Func<T, TKey> keySelector) { var cache = new Dictionary<TKey, T>(); return Observable.Defer(() => { lock (cache) return cache.Values.ToList().ToObservable(); }) .Concat(source.Do(s => { lock (cache) cache[keySelector(s)] = s; })); }
If you can’t take my word for it, here is the proof that it works:
[TestMethod()] public void CacheByKeyTestHelper() { // Arrange TestScheduler scheduler = new TestScheduler(); var source = scheduler.CreateHotObservable( OnNext(100, new { Ticker = "MSFT", Price = 30.5 }), OnNext(101, new { Ticker = "IBM", Price = 130.2 }), OnNext(102, new { Ticker = "MSFT", Price = 31.5 }), OnNext(103, new { Ticker = "IBM", Price = 130.1 }), OnNext(104, new { Ticker = "AAPL", Price = 150.5 })); // Act var target = RxExtensions.WithCachedSnapshot( source, quote => quote.Ticker); var results = scheduler.Run(() => target, 0, 0, 105); var cachedResults = scheduler.Run(() => target, 0, 0, 200); // Assert results.AssertEqual( OnNext(100, new { Ticker = "MSFT", Price = 30.5 }), OnNext(101, new { Ticker = "IBM", Price = 130.2 }), OnNext(102, new { Ticker = "MSFT", Price = 31.5 }), OnNext(103, new { Ticker = "IBM", Price = 130.1 }), OnNext(104, new { Ticker = "AAPL", Price = 150.5 })); cachedResults.AssertEqual( OnNext(105, new { Ticker = "MSFT", Price = 31.5 }), OnNext(105, new { Ticker = "IBM", Price = 130.1 }), OnNext(105, new { Ticker = "AAPL", Price = 150.5 })); } private Recorded<Notification<T>> OnNext<T>( long ticks, T value) { return new Recorded<Notification<T>>( ticks, new Notification<T>.OnNext(value)); }