Sunday, April 15, 2012

Switch Stream Pattern in Reactive Extensions

This post discusses and shows a usage pattern for Switch operator in Reactive Extensions. There are more than a handful of Reactive operators that I havent yet found the use for. I have however used Switch operator effectively. Here is my scenario that exploits switch.

Before we begin, Its important to understand that subscribing in Rx is a two step process:

  1. Create a Stream
  2. Subscribe to the stream.
Its implicit in above steps that stream must be created before you can subscribe to it. Switch allows a scenario where you want the subscription to go ahead even though you dont really have the real stream yet. That means, step 2 above can happen without step 1. That sounds counter intuitive but that's what switch allows us to do. Its obvious that you need a stream to subscribe to, you can create an empty stream to kind of fulfill the role of an actual stream until we get the actual stream. In this post, I am using the word Subscriber or Client interchangeably, both mean the same: subscriber to an Observable.

Set up:
- Client wants a stream of data

Design Constraint:
-  You want to give it out conditionally. It can be due to:
  • You are not ready due to lack of availability of business logic. In other words, business logic that governs readiness of the stream will come after client has asked for the stream. The fact that subscription to the stream happens after business logic is available is inconsequential.
  • Business logic dictating the availability of stream can change at any time, even after the client has subscribed. We must account for the case where business logic reverts the condition such that stream is no longer available to the subscriber(client).
-  Client/Subscriber shouldnt have to worry about the business logic dictating the availability of the actual stream. It should stay with the source(observable).

In other words, whenever you, as an owner of a stream is not sure about giving the stream to potential clients who want to subscribe to it, use a Switch Stream. A Switch Stream allows you to change your mind later without letting the client interfere in any way or know about it.

For example, lets say you have the following stream of longs that client is interested in:

At the time client makes a request for the stream for potential subsequent subscription, you are not certain if you want to give this out or you may not have the stream. You however don't want to stop the client from having some stream and subscribing to it. To compensate for lack of our readiness, lets create a dummy empty stream(of the same type as the actual stream), which we would like clients to have until we decide if we want to hand out the real stream.

What we want to do is the following:

We want to do this in such a way that no action from source or subscriber is required. To simulate the bool condition, we now have a stream of bools which will drive the switch between two streams(real and empty ones):

Now that we have the following three streams,
  1. Real stream (of type long in this case)
  2. Empty stream (always of the same type as the real stream)
  3. Driver stream (always of type bool)
we can create the switch stream that can be handed out to potential subscriber. Client will get and subscribe to switchStream even though client wanted actualLongStream. Its important to understand that our intention here is whenever the value of boolStream is true, we want to give out actualLongStream; otherwise we want to give out emptyLongStream. Client is however oblivious to all this:

At this point our stream is constructed and is ready for subscription. Following will not do anything as the stream hasn't become hot yet:

However if at this stage, if we do the following and drive our bool stream, we will see that the client has automatically subscribed to the actual stream and is getting data from it:

Similarly, the client can be switched back to empty stream by doing the following:

Switch does subscribe/unsubscribe behind the scenes. To verify this, I went back to subscription snooping discussed here. I altered my streams to log sub/dispose:

Ticking the bool stream to false like this:

shows that empty long stream was subscribed to(due to boolStream.OnNext(false)):


Now if we tick the bool stream to true:

we can see that previously subscribed empty stream has been unsubscribed and actual long stream has been subscribed. Since actual stream is hot, you see the data as well. All this sub/unsub switching is done by Switch behind the scenes:


Summary:

- Consider using Switch operator when you want to conditionally give out the observable.

Using switch operator: 
- Create the desired stream as you normally would.
- Create an empty stream of the same type as the desired stream.
- Create a bool stream which will tick based on some condition known to the source.
- Create a switch stream which is driven by the bool stream

Creating Reactive Streams

Here are some samples which I find useful when using Rx for purpose of figuring out various constructs. Typically I use Observable.Interval to create streams of any kind:

Following will create a stream of random ints:

Following will create a stream of random ints between 10 and 1000:

We can also use Select to convert the stream to a stream of booleans. Following creates alternating True, False never ending stream; ticking one per second:

Following creates a random stream of bools:

Stream of random doubles:

Stream of random doubles between two numbers. I am using GetRandomNumber method from here. Following will create a infinite stream of a doubles between 5 and 20 ticking one per second:

If I were to generate random prices for a particular security, I can do it like this. This will generate Bid and Ask continuously with Bid being always greater than a minimum price and ask being greater than bid:

Above query shows the following results in console:


The equivalent query in Linqpad is (Setting up Linqpad for Rx is here):

This shows up like this in LinqPad:





Saturday, April 14, 2012

Subscription snooping in Rx

When debugging using Reactive, I have often had the need to know
  • if subscription took place to the source 
  • if the client was still subscribed
  • the sub was dispose accidentally, if yes, when etc.
These are especially useful to know when one is starting to learn Rx. To achieve this, I created a simple extension method and I further extended it to include various stages of subscriptions and disposals. Following class honors the various stages related to sub:

Subscribing(before subscriptions takes place)
Subscribed(after subscription has happened)
Disposing(before subscription is about to be disposed)
Disposed(Subscription has been disposed)

For each stage, it will accept an Action which will be called by these methods. I abstracted it this way as I plan to extend abilities of snooping into the observable.


Note: I also want to add that you should be able to use Observable.Create to achieve this as well as shown here.

This can then be wrapped into an extension methods like this:

I added few extension methods like OnSubscribing, OnSubscribed, OnDisposing, OnDisposed etc. Typically though one would only be interested before Subscribing and after Disposal of the subscription. Here is the sample code which uses this method:

Here is the output in console. Note that it immediately shows that Take extension method disposes off the subscription:


Actullay I was puzzled that why did the subscription get disposed off as shown above. After further investigation, when I took off SubscribeOn combinator, it behaved differently:

The output here is as follows:


Note that now, the subscription is not being disposed off. These extension methods can throw some light into inner workings of the reactive subscription flow.

Also note that I created another extension method, WhenSubscribedOrDisposed, which lets you specify all four actions together. The advantage is that we are now creating one object vs 4 as before:


Useful Links:
  • Sample code is available here as a gist.
  • Using Observable.Create to do something similar is here.

Thursday, April 12, 2012

Creating Custom NLog target

I have started to use NLog and am currently exploring its features.  One of the features that I was looking at was to capture all that is being logged by various classes in different parts of the application. I wanted to display the logging in a window in a WPF application. After reading the docs, it came down to writing a custom target for NLog. In this post, I am going to create a NLog target in an assembly and let NLog know to load/create the target from that assembly.

Step 1:

Declare the target in nlog section of the config file and use it in Target section. In the snippet below, NLogSample is name of my executable where the target is implemented. NLog uses reflection and looks for Target attribute in the assembly to add targets to its internal collection. All this happens before the application starts to use the logging features of NLog:

Step 2:
Create the target in code. Here I am simply letting the clients register (I am using Rx, you can use Events) and this target informs the clients of the new message. NLog does offer MemoryTarget class; however that class does not expose any event to capture the message that was written. This target provides that functionality:

Step 3:
Now you can use the target in code to capture the messages from it. To get the target from NLog, you can do the following:

Once we have the target, we can subscribe to the event and use it. Here I am adding it to an observablecollection, which is bound to a listbox in WPF:

Following shows the log captured by the collection and shown in the window. For the sample, I am logging each resize event of the window. When you run the application(link at the bottom), you will also get a log in the Logs folder of the application. Any message that is logged after the listbox is bound, will appear in the log and the window.


Links/Code:
How to write a NLog target
Sample Code at Github can be downloaded from here. Code includes NLog and Reactive assemblies used in the sample. Its uses VS2010, .Net 4.0, C# and you should be able to build/run the sample if you have VS2010 or above.