As a prerequisite to XRing and ZRing P2P algorithm implementations, I needed to write an Event Stream Processor (ESP). Though the version described in this entry is not nearly as mature as what is used within my implementation of these P2P overlay algorithms, I nevertheless decided to share an implementation expressing several overall concepts.
The implementation leverages Windows Communication Foundation (WCF) client callback channels and provides support for:
- Topic-based publishers and subscribers.
- The System.Transactions namespace; specifically supporting the IPromotableSinglePhaseNotification and IEnlistmentNotification interfaces.
- Out-of-process and in-process subscribers and publishers.
The two primary interfaces relevant to this discussion are defined below.
///
/// This interface provides a set of event publisher and event subscriber related operations.
///
[ServiceContract(CallbackContract =typeof(IEventSubscriber), SessionMode = SessionMode.Required,
Namespace = "http://schemas.idynamics.us.com/2007/Events")]
public interface IEventProcessor
{
///
/// Subscribes to the specified .
///
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
void Subscribe(Topic topic);
///
/// Subscribes to the specified with the specified configuration.
///
[OperationContract(Name="SubscribeWithConfiguration")]
[TransactionFlow(TransactionFlowOption.Allowed)]
void Subscribe(Topic topic, EventSubscriberConfiguration configuration);
///
/// Unsubscribes from the specified .
///
[OperationContract]
[TransactionFlow(TransactionFlowOption.Allowed)]
void Unsubscribe(Topic topic);
///
/// Publishes an to the specified topic.
///
[OperationContract(Name="PublishOne")]
[TransactionFlow(TransactionFlowOption.Allowed)]
void Publish(ref Event e);
///
/// Publishes an to the specified topic.
///
[OperationContract(Name="PublishAll")]
[TransactionFlow(TransactionFlowOption.Allowed)]
void Publish(ref IList events);
}
The IEventProcessor interface defines the basic contract for an ESP that associates the IEventSubscriber (described below) interface as a callback contract.
///
/// Represents an event callback contract, which is used in conjunction with the interface.
///
public interface IEventSubscriber
{
///
/// Called when an is published.
///
[OperationContract(IsOneWay=true)]
void OnPublished(Event e);
}
The OnPublished method is called during event publication and notifies the subscriber as specific events are published. Examples of the consumer subscription and publication APIs follow and can be found in the accompanying unit tests.
In-Process Publication:
Topic topic = new Topic("urn:Financial:Stocks:Ticker");
StockPriceChangeEvent e = new StockPriceChangeEvent(topic, "MSFT");
e.CurrentPrice = 95.0;
e.Change = -.35;
e.Effective = DateTime.Now;
using(TransactionScope scope = new TransactionScope())
{
EventProcessor.Instance.Publish(ref e);
scope.Complete();
}
In-Process Subscription:
public class StockEventSubscriber : EventSubscriber
{
public override void OnPublished(Event e)
{
if(!(e is StockPriceChangeEvent))
{ return;
}
//
// Do something with "e".
//
}
}
StockEventSubscriber callback = new StockEventSubscriber();
Topic topic = new Topic("urn:Financial:Stocks:Ticker"); EventProcessor.Instance.Subscribe(topic, callback);
Out-Of-Process Publication:
Topic topic = new Topic("urn:Financial:Stocks:Ticker");
StockPriceChangeEvent e = new StockPriceChangeEvent(topic, "MSFT");
e.CurrentPrice = 95.0;
e.Change = -.35;
e.Effective = DateTime.Now;
using(EventProcessorClient client = new EventProcessorClient()
{
using(TransactionScope scope = new TransactionScope())
{
client.Publish(ref e);
scope.Complete();
}
}
Out-Of-Process Subscription:
public class StockEventSubscriber : EventSubscriber
{
public override void OnPublished(Event e)
{
if(!(e is StockPriceChangeEvent))
{
return;
}
//
// Do something with "e".
//
}
StockEventSubscriber callback = new StockEventSubscriber();
using(EventProcessorClient client = new EventProcessorClient(new InstanceContext(callback)))
{
Topic topic = new Topic("urn:Financial:Stocks:Ticker");
client.Subscribe(topic);
}
}
I’ve intentionally left specific requirements surrounding preservation of order, delivery guarantees, and durability of both the subscriber and publisher to the reader and urge you to consider the architecture surrounding these three requirements especially when the Parallel Extensions for .NET are used.
The full source code is attached to this blog entry.