Saturday, April 3, 2010

MediaRss libraries now on codeplex

I have just published a new Codeplex project: http://mediarss.codeplex.com/. This project has grown out of a larger effort I am currently working on. I had a need to read and write MediaRss files. If you aren’t familiar with the MediaRss spec it is an RSS spec originally developed by Yahoo for their video search engine. It has since been adopted by the wider community and is becoming more standardized. For details on the spec itself see here and here.

For what I am working on this format worked well for me. However, there wasn’t much out there in the way of C# libraries that did any of the heavy lifting for me. In general parsing RSS is trivial in .NET but there are a few oddities about this spec that lead me down the path of building my own library. It took a little tedious work to complete but now it as at point where it is useful to me and I felt it could be valuable to others looking for a quick solution to dealing with MediaRss files.

I leveraged the SyndicationFeed object model from .NET 3.5 to make consuming the feed feel natural to those already familiar with this model. The usage looks like this

var reader = XmlReader.Create("SampleMedia.rss");
var feed = SyndicationFeed.Load<MediaRssFeed>(reader);


What you get back is a strongly typed MediaRssFeed object. You can then treat it like you would any other SyndicationFeed object. I have also included an extension method to easily cast the Items property of the MediaRssFeed to the MediaRssItem object. To do that you would do:



var feed = SyndicationFeed.Load<MediaRssFeed>(reader);
IEnumerable<MediaRssItem> feedItems = feed.Items.AsMediaRssItems();


This lets you dive into all the properties on the items that are specific to the MediaRss spec. I still have a little more work to do to fully support the spec but it is most of the way there. The unit test project is best the place to see the samples of how to read and write using the library and how to inspect the different properties of the object. You can get the compiled binaries and / or the full source code from here. Enjoy.

Labels: , ,

Thursday, February 18, 2010

Adding Rx Framework to Courier

In a previous series of posts I introduced the Courier framework. Recently I have been doing some refactoring on the project and feature enhancements. The refactoring was fairly straight forward.

I was never quite happy with the way that you unregistered for a message so I changed the model of message registration to return a token and then subsequently changed the signature of unregister to take in this token. That change was pretty straight forward and badly needed.

The big feature in this update is the integration with the Rx Framework. If you don’t understand Rx I suggest diving into these videos. Be forewarned, the Rx guys are incredibly smart and tend to blaze through things very quickly. Much is left “up to the reader” to figure out and play with on their own. This makes it pretty difficult to grok. I am not sure I really understand all the moving parts of Rx.

One of my main goals in adding Rx to the Courier project was to allow you do include or exclude the Rx features with minimal pain. I also wanted to avoid a deep dependency on the Rx extension assembly. So I created a separate assembly for the Rx features that introduces an extension method to the Messenger class. This gives you an additional way to subscribe to a message and get an IObservable back. Since it is in a separate assembly you should be able to omit the assembly (and any dependency on Rx) pretty easily if you wanted to.

So what does this change let you do? Essentially I have added one extension method to the messenger that gives you different method to subscribe for a message. Syntactically it looks like this

IObservable myObservable = MyMessenger.RegisterForMessage("Foo");


This tells the messenger that you want to listen for the “Foo” message and get the result pushed into your myObservable sequence. So why is this cool? LINQ.



Plain and simple IObservable is queryable with LINQ and that makes for very interesting scenarios when you are dealing with a message pump that is very active and you want to filter out messages based on a criteria. I have added an example of how to use the new Rx extension method and tried to model the scenario into a real world situation.



Imagine you are listening to WCF service that is sending messages from a Blood Pressure monitor. Imagine the Messenger is broadcasting messages each time the Blood Pressure monitor sends some data.   This can lead to a flood of messages firing through the Messenger. Using this new extension method you can write a simple LINQ query to filter these messages and only “react” to the ones you care about.



In the scenario I show in the sample project I only care about messages when the blood pressure is outside of the “normal” range. In those cases I want to “react” and show some alert. I have written a quick and dirty simulation that will fire messages every second and randomize the pressure. When the pressure is outside of the predefined range I will get a message “pushed” into my IObservable sequence. When this happens I have a delegate that “reacts” to this “push” and does something (in my case I raise an alert)



So, from the snippet above I can take it one step further and do this:



IObservable<Object> observable = from msg in Mediator.RegisterForMessage("BloodPressureMessage")
where (((BloodPressure)msg).Systolic < 90 || ((BloodPressure)msg).Systolic > 119)
&& (((BloodPressure)msg).Diastolic < 60 || ((BloodPressure)msg).Diastolic > 79)
select msg;

observable.Subscribe(OnAlertReceived);


Notice how I am now writing a LINQ query on the IObservable that the messenger is returning from the RegisterForMessage method. The line after the LINQ query is how I subscribe to the “push” events that happen on the observable sequence. OnAlertReceived is the delegate I want invoked when a new message is “pushed” on the IObservavle sequence.



Cool? I think so. Also, this check in I am going to promote to Beta 2 status. I have added unit tests and done some other small cleanup work. I think it is getting close to being ready for a true release. I would like to buff out the sample application a bit more and get some more API documentation in place and work out and remaining bugs.



one little nit that I want to clean up in the next version is the casting of the “msg” in the LINQ Query. There are some internal issues with how I am implementing Rx under the covers that makes strong typing of the resulting observable challenging, I don’t have a solution for this yet given my current implementation, so if you find one please contribute it back to the project or let me know.

Labels: , ,

Monday, December 21, 2009

Decoupled ViewModel Messaging (Part 3)

NOTE: All the code shown in this series has been published to codeplex. If you would like to download the framework or the sample application that uses the framework you can find it here: http://courier.codeplex.com/

    In this part of our saga we find or intrepid hero trapped in a ……. Wait where was I? So far in this series we have covered decoupled messaging between ViewModels at a high level and we have looked at one specific enhancement that I have added to my courier framework that is lacking in other implementations.

     In this final post I am going to cover one more feature that I have added to the courier framework that I think provides a very nice addition to the overall process of decoupled ViewModel messaging and that is message caching.

    When I first started with this framework I really just set out to wrap my head around what others had done (CoreMVVM,Cinch, Prism, MVVM Foundation). Once I had gotten to that point I wanted to add a little something more to make the framework my own. What I found was that, in my scenarios, I found myself needing to cache messages for later retrieval. 

    The most obvious case for this is in the “Wizard” type scenario. When a user is performing a sequential set of actions I often have the need to pass data from one screen to the next. In this scenario the next screen is not created until after the previous screen is destroyed. This means that the message broadcast from screen one would happen before screen two is around to listen for it. With me so far?

    So, to solve this problem I figured I would implement some sort of caching mechanism that would allow me to:

  • Broadcast a message from View one
  • Save the message for re-broadcast
  • Destroy View one
  • Create View two
  • Register for that message from View two
  • Receive any, valid, cached copies of the message I registered for

The first thing I did was to implement an internal List<T> inside the mediator class that would store these CachedMessage objects. When a message is broadcast with caching options I will save a copy of the message to this List<T>. Any subsequent registrations for this message will receive any valid cached copies of the messages.

    So I added the following to the mediator

private readonly List<CachedMessage> cachedMessages = new List<CachedMessage>();


Where the CachedMessage class is defined like this:



[DebuggerDisplay("Message: {Message}, Parameter: {Parameter}")]
internal class CachedMessage
{
public String Message { get; private set; }
public Object Parameter { get; private set; }
public CacheSettings CacheOptions { get; private set;}
public Int32 ResendCount { get; set; }

public CachedMessage(String message, Object parameter)
{
Message = message;
Parameter = parameter;
}

public CachedMessage(String message, Object parameter, CacheSettings cacheOptions)
{
CacheOptions = cacheOptions;
Message = message;
Parameter = parameter;
}
}


    Two things to note here. First the CachedMessage class has a public property of type CacheSettings. This gives us the details of how long to keep the message for. Currently I am supporting two types of caching, time based and recurrence based. This means you can specify a specific DateTime when the message will expire or you can specify how many re-broadcasts before the message expires. The CacheSettings class looks like this:



[DebuggerDisplay("Expiration Date: {ExpirationDate}, NumberOfResends: {NumberOfResends}")]
public class CacheSettings
{
public DateTime ExpirationDate { get; private set; }
public Int32 NumberOfResends { get; private set; }

public CacheSettings(DateTime expiration)
{
ExpirationDate = expiration;
NumberOfResends = Int32.MaxValue;
}

public CacheSettings(Int32 timesToResend)
{
NumberOfResends = timesToResend;
ExpirationDate = DateTime.MaxValue;
}
}


    The second thing to note about both of these classes (CachedMessage and CacheSettings) is the use of the DebuggerDisplay attribute. I have just recently got in the habit of using this attribute religiously and I don’t know what I ever did without it. I won’t dive too in depth here about it, but suffice it to say, if you aren’t using it right now you should be. If you don’t know what it is stop reading here and jump over to my colleague, Adam Calderon’s, excellent post about it (don’t worry I will wait for you to come back).



    Ok, now that we are all writing Debugger friendly classes thanks to Adam, let’s wrap this post up with some usage scenarios. In the “Wizard” like case I talked about above I really just want to be able to broadcast a message and cache it for one re-broadcast. This allows me to broadcast the message from view one and then destroy view one and create view two. view two then registers for the message like it would normally and will immediately receive any messages that are in the cache. Once the message is dispatched from the cache we want to clean it out to prevent further re-broadcast. So first, two examples of how to call BroadcastMessage<T> with caching options



//NumberOfResend based caching example
Mediator.BroadcastMessage("Content1Message",messageContent,new CacheSettings(1));

//Time Based caching example
Mediator.BroadcastMessage("Content1Message", messageContent, new CacheSettings(DateTime.Now.Add(TimeSpan.FromSeconds(30))));


The first example show how to broadcast a message and specify that it should be cached for one re-broadcast. The second example shows how to broadcast a message and store it in the cache for 30 seconds after the first broadcast. Pretty straightforward so far. Now the two key methods in the Mediator class that handle keeping the cache clean and dispatching messages from cache:



private void GetMessagesFromCache(String message, Delegate callback)
{
CleanOutCache();
//Search the cache for matches messages
List<CachedMessage> matches = cachedMessages.FindAll(action => action.Message == message);
//If we find matches invoke the delegate passed in and pass the message payload
matches.ForEach(delegate(CachedMessage action)
{
callback.DynamicInvoke(action.Parameter);
action.ResendCount++;
});
}

private void CleanOutCache()
{
//Remove any expired messages from the cache
cachedMessages.RemoveAll(message => (message.CacheOptions.ExpirationDate < DateTime.Now) || (message.ResendCount >= message.CacheOptions.NumberOfResends));
}


    You can see in the GetMessagesFromCache method the first thing it does is clean out any expired messages from the cache. The CleanOutCache method uses the RemoveAll method on List<T> and an lambda to do a particularly elegant line of code. This code checks two properties of the CachedMessage objects (is it’s expiration date in the past OR has it been re-broadcast the max number of times?). If either is true it is removed from the CachedMessages list. A very succinct line of code for a normally cumbersome task.



    After the cache has been cleaned we call FindAll (again, using a simple lambda) to get all the messages from cache that match the one we are looking for (There could be multiple messages from different senders in the cache and we want to dispatch them all). Once we have the matching messages we invoke the callback that is registered and we increment the re-send count on the message. Overall, a pretty simple process, but it turns out to be very powerful.



    That wraps up this 3 part series ( I, II, III) on the Courier framework. As I add new features to the framework I will post specific updates about the features. If anyone else is interested in joining the codeplex project and adding their ideas to the framework please feel free to contact me through this blog.  



NOTE: All the code shown in this series has been published to codeplex. If you would like to download the framework or the sample application that uses the framework you can find it here: http://courier.codeplex.com/

Labels: , , , , ,

Saturday, December 12, 2009

Decoupled ViewModel Messaging (Part 2)

NOTE: All the code shown in this series has been published to codeplex. If you would like to download the framework or the sample application that uses the framework you can find it here: http://courier.codeplex.com/

In my previous post I discussed the ViewModel messaging framework at a high level and went over the basics of the implementation. Most of this is review if you are familiar with the other frameworks that I borrowed ideas from.

In this post I want to touch on the two specific features I wanted to add to the other frameworks I had seen (hence the motivation behind this framework and this series of blog posts). Specifically, the two features I felt would be useful is the ability to explicitly unsubscribe from a particular message and the ability to cache messages for re-broadcast at a later date.

The first feature seems obvious enough. There are times when you want to listen for messages and there are times that you want to stop listening for that message. Many of the frameworks mentioned previously use WeakReferences to store the pointer to the message handler method. This works great when object are being destroyed. The Mediator object, which is responsible for delegating messages to their proper handlers, stores only a WeakReference to the handler and checks just prior to dispatching the message if the endpoint IsAlive. If it is then the message is dispatched. If it isn’t alive, then the endpoint is removed from the list of subscribers. This check looks something like this:

for (int i = weakSubscribers.Count - 1; i > -1; --i)
{
WeakSubscriber weakSubscriber = weakSubscribers[i];
if (!weakSubscriber.IsAlive)
weakSubscribers.RemoveAt(i);
else
subscribers.Add(weakSubscriber.CreateAction());
}


What this does is simple. It walks the collection of subscribers backwards and checks the IsAlive property. The IsAlive property on the WeakSubscriber class is simply looking at the underlying WeakReference.IsAlive property. If we find a dead subscriber we remove it from our list so that we don’t try to dispatch the message to an event handler method that has been destroyed.



This pattern works great is your subscriber objects, that is the object that has the event handler for the message in it, is being destroyed. However, I have situations where my object that contains the handler method for a message is still alive but I don’t want to receive any more messages of a given type from the mediator. In this case I need a way to tell the Mediator to remove from it’s list of subscribers.



To help with this I have added a method to the Messenger class call UnRegisterForMessage.



//Method bodies elided for clarity 
public void UnRegisterForMessage(String message, Delegate callback)
{
...
}


This method takes in the message you want to unsubscribe to and the handler you want to remove for that message. For a given message and object can have multiple handlers. This method allows you to unsubscribe a particular handler while, potentially keeping the other handlers subscribed.



On caveat to this approach is how I am passing the reference to the handler method. In the internal unregister process  I need a way to determine which handler I am removing from which message. The only way I could find to make this work was to pass a reference to the delegate itself. The remove method inside the MessageToSubscriberMap class looks like this:



internal void RemoveSubscriber(String message, Delegate callback)
{
lock (map)
{
if (map.ContainsKey(message))
{
map[message].RemoveAll(subscriber => subscriber.Method == callback.Method);
}
}
}


This works fine but the usage is a bit strange and has one glaring issue that I have yet to find a solution for. Let me demonstrate. I if I want to unsubscribe from a message called “UserChanged.” I would write something like this:



Mediator.UnRegisterForMessage("UserChanged", (Action<IUser>)OnUserChanged)


Notice the second parameter here. I am the reference to the delegate OnUserChanged and casting it as an action. This will work just fine. However, in some internal projects I have used this framework and noticed other developers do something like this:



Mediator.UnRegisterForMessage("UserChnaged", new Action<IUser>(OnUserChnaged))


Notice the difference? Subtle maybe, but it is critical to understand the problem here. In the second example the second parameter, instead of being cast as an Action<T> a new instance of an Action<T> is being created and passed in. This will not point to the same instance of the delegate passed in on the Register call. This means the unsubscribe will not find a match and will not remove the handler. This means you will continue to get messages broadcast. In general, I am not completely satisfied with the unsubscribe method signature and may change it in the future.



  This post is getting a bit long winded so I will stop here and in my next post I will describe message caching and how I went about implementing it.



NOTE: All the code shown in this series has been published to codeplex. If you would like to download the framework or the sample application that uses the framework you can find it here: http://courier.codeplex.com/

Labels: , , , ,

Tuesday, November 17, 2009

Decoupled ViewModel Messaging (Part 1)

 

NOTE: All the code shown in this series has been published to codeplex. If you would like to download the framework or the sample application that uses the framework you can find it here: http://courier.codeplex.com/

Lately I have been playing around with different ways to decouple the communication between ViewModels in the MVVM pattern. My focus has been in WPF, as this is what the MVVM pattern is suited for however, this messaging technique is not specific to WPF and could be used in any other decoupled solution (MVC, MVP etc..)

I poked around on the tubes looking for examples of how other people have solved this problem and there are many different options out there. My implementation borrows heavily from the CoreMVVM framework on codeplex which itself borrows bits from Cinch, Prism, MVVM Foundation etc.. 

My first goal was to really just get down in the weeds and understand how these other frameworks were handling decoupled messaging between objects. Once I wrapped my head around it I then came up with two features that I thought would be nice to have in a messaging solution that I hadn’t seen in other frameworks.

The ability to manually unsubscribe from messages and the ability cache messages for re-broadcast.

Before I dive in and talk about the new features I wanted to add let me go over the basic implementation of the Mediator pattern I did. Again, this is very similar to how the CoreMVVM framework does it, so if you are familiar with that framework this should review.

The basis of the implementation is a basic mediator pattern. You have a mediator object that stores a mapping of subscribers and messages. This gives the mediator enough information to understand what messages to dispatch to what subscribers. The mediator in my case looks like this 

//Method bodies elided for clarity
public class Mediator
{
private readonly MessageToSubscriberMap subscribers = new MessageToSubscriberMap();
private readonly List<CachedMessage> cachedMessages = new List<CachedMessage>();

public void RegisterForMessage(String message, Delegate callback)
{
...
}

public void UnRegisterForMessage(String message, Delegate callback)
{
...
}

public void BroadcastMessage<T>(String message, Boolean cacheMessage, T parameter)
{
...
}

public void BrodcastMessage<T>(String message, Boolean cacheMessage)
{
...
}

private void GetMessagesFromCache(String message, Delegate callback)
{
...
}
}


From the snippet above you can see there isn’t much going on in the Mediator. The key is really the MessageToSubscriberMap object defined at the top. This object is what stores the (weak) reference to the subscriber (in this case a delegate to be invoked) and the message that subscriber is interested in.



Here are the guts of the MessageToSubscriberMap class



//Method bodies elided for clarity
internal class MessageToSubscriberMap
{
//Store mappings with weak references to prevent leaks
private readonly Dictionary<String, List<WeakSubscriber>> map = new Dictionary<String, List<WeakSubscriber>>();
internal void AddSubscriber(String message, Object target, MethodInfo method, Type subscriberType)
{
...
}

internal void RemoveSubscriber(String message, Delegate callback)
{
...
}

internal List<Delegate> GetSubscribers(String message)
{
...
}
}


The key piece of this map is the private Dictionary field. This is the mapping of Messages to the collection of WeakSubscribers that are listening for that message. I have called the object WeakSubscriber to indicate that a subscriber is really a WeakReference to a delegate. Here is the complete implementation of the WeakSubscriber class



internal class WeakSubscriber
{
private readonly MethodInfo method;

public MethodInfo Method { get { return method; } }

private readonly Type delegateType;
private readonly WeakReference weakReference;

internal WeakSubscriber(Object target, MethodInfo method, Type parameterType)
{
//create a WeakReference to store the instance of the target in which the Method resides
weakReference = new WeakReference(target);
this.method = method;
delegateType = parameterType == null ? typeof(Action) : typeof(Action<>).MakeGenericType(parameterType);
}

internal Delegate CreateAction()
{
Object target = weakReference.Target;
return target != null ? Delegate.CreateDelegate(delegateType,weakReference.Target,method) : null;
}

public Boolean IsAlive
{
get { return weakReference.IsAlive; }
}

}


Ok so now we have seen the extent of my little messaging framework. Again, most of this code is very similar to the CoreMVVM framework implementation of the mediator pattern so, for users of that framework, this should be familiar.



I am going to wrap this post up with a quick example of how you would subscribe to a message and how you would broadcast a message with this framework.



To broadcast a message you would do this:



Mediator.BroadcastMessage("Content1Message",true, messageContent);


Note the second parameter in this method call is for caching the message which I will explain in a following post.



If you wanted to subscribe to this Content1Message you would register for the message like this:



Mediator.RegisterForMessage("Content1Message", (Action<String>)OnContent1MessageReceived);


In the next post I am going to describe the two new features I added to the framework which are the ability to unsubscribe from a message and the ability to cache messages for re-broadcast.



This project was built in Visual Studio 2010 Beta 2, If you haven’t already I highly recommend downloading VS2010



As a side note, the specific reason I using VS2010 is because I plan on parallelizing the dispatching of messages to subscribers in the future and want to use the new parallel framework in .NET 4.0 



NOTE: All the code shown in this series has been published to codeplex. If you would like to download the framework or the sample application that uses the framework you can find it here: http://courier.codeplex.com/

Labels: , , , , ,