Event Stream Processor (ESP) using WCF

As a prerequisite to XRing and ZRing P2P algorithm implementations, I needed to write an Event Stream Processor (ESP). Though the version described in this entry is not nearly as mature as what is used within my implementation of these P2P overlay algorithms, I nevertheless decided to share an implementation expressing several overall concepts.

The implementation leverages Windows Communication Foundation (WCF) client callback channels and provides support for:

  1. Topic-based publishers and subscribers.
  2. The System.Transactions namespace; specifically supporting the IPromotableSinglePhaseNotification and IEnlistmentNotification interfaces.
  3. Out-of-process and in-process subscribers and publishers.

The two primary interfaces relevant to this discussion are defined below.

///
/// This interface provides a set of event publisher and event subscriber related operations.
///
[ServiceContract(CallbackContract =typeof(IEventSubscriber), SessionMode = SessionMode.Required,
 Namespace = "http://schemas.idynamics.us.com/2007/Events")]
public interface IEventProcessor
{
    ///
    /// Subscribes to the specified .
    ///
    [OperationContract]
    [TransactionFlow(TransactionFlowOption.Allowed)]
    void Subscribe(Topic topic);

    ///
    /// Subscribes to the specified with the specified configuration.
    ///
    [OperationContract(Name="SubscribeWithConfiguration")]
    [TransactionFlow(TransactionFlowOption.Allowed)]
    void Subscribe(Topic topic, EventSubscriberConfiguration configuration);

    ///
    /// Unsubscribes from the specified .
    ///
    [OperationContract]
    [TransactionFlow(TransactionFlowOption.Allowed)]
    void Unsubscribe(Topic topic);

    ///
    /// Publishes an  to the specified topic.
    ///
    [OperationContract(Name="PublishOne")]
    [TransactionFlow(TransactionFlowOption.Allowed)]
    void Publish(ref Event e);

    ///
    /// Publishes an  to the specified topic.
    ///
    [OperationContract(Name="PublishAll")]
    [TransactionFlow(TransactionFlowOption.Allowed)]
    void Publish(ref IList events);
}

The IEventProcessor interface defines the basic contract for an ESP that associates the IEventSubscriber (described below) interface as a callback contract.

///
/// Represents an event callback contract, which is used in conjunction with the  interface.
///
public interface IEventSubscriber
{
    ///
    /// Called when an  is published.
    ///
    [OperationContract(IsOneWay=true)]
    void OnPublished(Event e);
}

The OnPublished method is called during event publication and notifies the subscriber as specific events are published. Examples of the consumer subscription and publication APIs follow and can be found in the accompanying unit tests.

In-Process Publication:

Topic topic = new Topic("urn:Financial:Stocks:Ticker");
StockPriceChangeEvent e = new StockPriceChangeEvent(topic, "MSFT");
e.CurrentPrice = 95.0;
e.Change = -.35;
e.Effective = DateTime.Now;
using(TransactionScope scope = new TransactionScope())
{
    EventProcessor.Instance.Publish(ref e);
    scope.Complete();
}

In-Process Subscription:

public class StockEventSubscriber : EventSubscriber
{
 public override void OnPublished(Event e)
{
if(!(e is StockPriceChangeEvent))
{             return;
}          

//
// Do something with "e".
//
}
}
StockEventSubscriber callback = new StockEventSubscriber();
Topic topic = new Topic("urn:Financial:Stocks:Ticker"); EventProcessor.Instance.Subscribe(topic, callback);

Out-Of-Process Publication:

Topic topic = new Topic("urn:Financial:Stocks:Ticker");
StockPriceChangeEvent e = new StockPriceChangeEvent(topic, "MSFT");
e.CurrentPrice = 95.0;
e.Change = -.35;
e.Effective = DateTime.Now;

using(EventProcessorClient client = new EventProcessorClient()
{
    using(TransactionScope scope = new TransactionScope())
    {
        client.Publish(ref e);
        scope.Complete();
    }
}

Out-Of-Process Subscription:

public class StockEventSubscriber : EventSubscriber
{
	public override void OnPublished(Event e)
	{
		if(!(e is StockPriceChangeEvent))
		{
			return;
		}          

		//
		// Do something with "e".
		//
	}

	StockEventSubscriber callback = new StockEventSubscriber();
	using(EventProcessorClient client = new EventProcessorClient(new InstanceContext(callback)))
	{
		Topic topic = new Topic("urn:Financial:Stocks:Ticker");
		client.Subscribe(topic);
	}
}

I’ve intentionally left specific requirements surrounding preservation of order, delivery guarantees, and durability of both the subscriber and publisher to the reader and urge you to consider the architecture surrounding these three requirements especially when the Parallel Extensions for .NET are used.

The full source code is attached to this blog entry.

Dynamics.Esp.zip

Singleton<T>

In many ways software engineering strives to strike a balance between testability, scalability (and performance), execution logic and symmetry (in the case of distributed algorithms), and semantics (easily understood and usable) while remaining efficient in the use of system resources. In my opinion, these major variables are all of equal weighting; these define the elegance of a particular design.

For this discussion, I’d like to focus on two of the variables: testability and efficient use of system resources.

Garbage Collection

As a prerequisite, I recommend you read this article on how garbage collection works at a high-level in the Microsoft .NET Framework. If you don’t have time to read the article then do take time to read the next paragraph.

The following is a good idea what sorts of things we should try to avoid to get the best performance out of the garbage collector:

  1. Too many allocations.
  2. Too-large allocation.
  3. Too many pointers.
  4. Too many roots.
  5. Too many object writes.
  6. Too many Almost-Long-Life objects.
  7. If you implement IDisposable then suppress finalization to reduce costs.

Dependency Injection (DI)

A foundational pattern in xUnit testing is Dependency Injection (DI), which is a form of Inversion of Control (IoC). Specifically, DI is a programming technique where the implementation of one class is actually performed partially by another. Inversion of Control is where a program gives up control of its own execution and simply responds to requests made of it. In the same way, a class using dependency injection gives up control over some of its implementation and lets the injected class do the work.

Three types of DI exist; constructor injection, property injection, and method call injection. Constructor injection is a pattern whereby dependencies are “injected” into an object during construction. Whereas, property injection uses set operations [on properties] to inject dependencies, which obviously occurs after construction. Method call injection simply uses method calls to inject dependencies.

Several libraries exist in each category. For example, the Castle Project and ObjectBuilder both provide supports for constructor and property injection. As a side note, the Microsoft Enterprise Library uses ObjectBuilder internally as its DI container framework. Unity is Microsoft’s latest offering from the Pattern’s & Practices team. Unity offers property, constructor, and method call injection.

Static Classes

A static class does not have any instance-level members (including constructors) and is defined by the static modifier. The compiler in turn marks a static class as sealed and automatically creates a private constructor.

The main features of a static class are:

  1. They only contain static members.
  2. They cannot be instantiated.
  3. They are sealed.
  4. They cannot contain Instance Constructors.

Inherently in the context of unit (and integration) testing, static classes are not recommended due to the difficulty in implementing a thread-safe property injector DI pattern and being unable to use constructor DI. However, the advantage of a properly written static class is there’s only one object instance per AppDomain, which can be very efficient on the GC (see the Too Many Object Writes section of the previously referenced article).

This is a very important and annoying impedance that exists between the use of a constructor DI pattern and efficient use (and preservation) of system resources (in this case CPU and memory). We want to be testable and mindful of system resources.

Singletons

For purposes of this discussion, a singleton is a class that only allows a single instance of itself to be created within an AppDomain. The following are recommended singleton patterns.

Sidebar: Understand the Impact of Low-Lock Techniques in Multithreaded Apps

Pattern 1: Niladic Constructor

public class MySingleton
{
    private static readonly MySingleton _instance = new MySingleton();

    private MySingleton()
    {
    }

    public static MySingleton Instance
    {
        get { return _instance; }
    }
}

Pattern 2: Niladic Constructor (Full Lazy Initialization)

public class MySingleton
{
    class Nested
    {
        //
        // Explicit static constructor to tell C# compiler not to mark
        // type as BeforeFieldInit
        //
        static Nested()
        {
        }

        internal static readonly MySingleton Instance = new MySingleton();
    }

    private MySingleton()
    {
    }

    public static MySingleton Instance
    {
        get { return Nested.Instance; }
    }
}

Note, the following alternate method results in identical MSIL…

public class MySingleton
{
    static class Nested
    {
        internal static readonly MySingleton Instance = new MySingleton();
    }

    private MySingleton()
    {
    }

    public static MySingleton Instance
    {
        get { return Nested.Instance; }
    }
}

A rather dated but still applicable discussion regarding the use of the double-check lock pattern written by Vance Morrison can be found here and provides insight into subtle nuances of the Microsoft .NET Framework memory model implementation. I’ve attached a PDF version of the discussion to this entry (at the bottom) in case the archive disappears.

Singleton<T>

So back to the question at hand, which is how do I ensure my production code is testable and uses system resources efficiently as possible? I offer the Singleton<T> class, which is defined as…

public sealed class Singleton where T : new()
{
    private static readonly T _instance = new T();

    private Singleton()
    {
    }

    public static T Instance
    {
        get { return _instance; }
    }
}

With the Singleton<T> class, we’re now able to use the following syntax in production code…

public class CustomerManager : BusinessLogicComponent
{
    private ICustomerRepository _repository;

    //
    // This constructor is required by the Singleton generic class.
    //
    [EditorBrowsable(EditorBrowsableState.Never)]
    public CustomerManager()
        : this(Singleton.Instance)
    {
    }

    //
    //  This constructor is to be used only by test projects.
    //
    [EditorBrowsable(EditorBrowsableState.Never)]
    public CustomerProcessor(ICustomerRepository repository)
    {
        _repository = repository;
    }

    public void Add(Customer customer)
    {
        _repository.Add(customer);
    }
}

…and the following syntax from test projects.

[Test]
public void Customer_Add_Succeeds()
{
    //
    // Create a mock customer repository and inject it into the business
    // logic component (manager).
    //
    ICustomerRepository repository = new MockCustomerRepository();
    CustomerManager manager = new CustomerManager(repository)

    //
    // Perform remaining test operations.
    //
}

Consumers of the CustomerManager business logic class in production simply use the Singleton<T> class just as the CustomerManager does with the CustomerRepository class in its niladic constructor.

A final note on thread-safety. The singleton pattern does require all instance level methods to be thread-safe.

Conclusion

The Singleton<T> class provides an implementation pattern to balance between two important variables in any design; that of testability and efficient use of system resources. I encourage you to experiment.

Garbage Collector Basics and Performance Hints.zip

Tribute to Honor Jim Gray

A dear friend and former colleague notified me of an upcoming event for Jim. From Werner Vogels‘ site:

On May 31 2008 a tribute will be held at UC Berkeley to honor Jim Gray, who went missing during a solo sailing trip in January of this year. Although Jim is listed as missing, and will be until 2011, a Tribute be held to honor him before too much time has passed. There are two parts to this event:

The morning event, which will be in a very large hall, is open and public; the technical session, which is in a smaller hall, and for which you need to register. More details at the Tribute website.

I expect to attend.

XRing DHT and Windows Communication Foundation (WCF)

A few weeks ago I provided a brief history of the P2P landscape in which I mentioned several Distributed Hash Table (DHT) implementations. The latest XRing DHT publication from Microsoft Research is dated September 2004. Substantial work has been done since that publication. To my knowledge, the only XRing DHT implementation exists within the Microsoft Research group and is not publicly available.

The XRing design is quite interesting and, unlike most DHT algorithms, isn’t dominated by a minimalist approach and instead focuses on deployment situations where the churn rate (node join and leave rate) is low or the system is of moderate size (between 1 and approximately 1 million nodes). The design of XRing uses a layered routing scheme to deliver O(logN) routing at worse case with a usual 1-hop anywhere route (2 hops for a million nodes), which is facilitated by the Finger Table and Soft-State Routing Table (SSRT), respectively. The third routing table is the Leaf Set, which contains 2L+1 entries, where L is the number of nodes to probe on either side of the home node plus the home node.

Membership in an XRing DHT is guaranteed by a weak and eventual membership protocol, which employs a special anti-entropy protocol that allows the system to coalesce; equilibrium is eventually reached. The membership and routing layer is where all the fun happens. For my implementation, a node may join an existing XRing either (a) by sending a one-way multicast request or (b) by sending a point-to-point request to an existing member. For (a), one common pattern is to send the request over a multicast channel and the response back over a point-to-point binding, which map wonderfully into WCF.

The multicast channel is realized by using the NetPeerTcpBinding class and the point-to-point channel by the NetTcpBinding class.

[ServiceContract(Namespace = "http://ws.idynamicscorp.com/P2P/Dht/XRing")]
public interface IXRingDhtDuplexMembership
{
	[OperationContract]
	void Join(DhtNodeId id);

	[OperationContract]
	void Leave(DhtNodeId id);
}

[ServiceContract(Namespace = "http://ws.idynamicscorp.com/P2P/Dht/XRing", CallbackContract = typeof(IXRingDhtMulticastMembership))]
public interface IXRingDhtMulticastMembership
{
	[OperationContract(IsOneWay = true)]
	void MulticastJoin(DhtNodeId id);

	[OperationContract(IsOneWay = true)]
	void MulticastLeave(DhtNodeId id);
}

An important design goal for any DHT implementation is that of ring simulation. Therefore, it’s important to allow a single process to support multiple nodes. Unfortunately, this meant I couldn’t use WCF configuration files but instead had to programmatically create the contract, binding, and endpoint address for each node. This took a bit of work to accomplish due to two primary reasons:

  1. The current documentation is not in-sync with the bits.
  2. The WCF samples use configuration-defined bindings, contracts, and endpoints.

Incidentally, working with the WCF configuration files (due to bullet 1) reminded me of a quote from David Hansson that I read on Box’s blog but with a minor modification:

WCF configuration files feel more like the doorknob to the gates of hell. In itself, a doorknob is hardly evil. But once you turn…

Fortunately (for me at least), Lutz Roeder’s .NET Reflector filled in the gaps. After some caffeine, a few frustrating moments, and questioning why I even started down this path, I found myself with a working prototype.

If you run Windows 2003 R2 as the operating system on my primary development machine (as I do), then you currently have one more obstable to overcome. Per the WCF peer channel team, the Peer Name Resolution Protocol (PNRP) service is currently available only on Windows XP (SP1 + networking pack installed), Windows XP (SP2), Windows Vista, and Windows XP Professional (64-bit) [URI]. Fortunately, you are able to leverage the CustomPeerResolver sample, which can be found in the “PeerChannelCustomPeerResolver” folder of the WinFX February CTP.

The following code fragment illustrates how to programmatically reference the custom peer resolver:

NetPeerTcpBinding binding = new NetPeerTcpBinding();
binding.Port = 4242;
binding.Security.Mode = SecurityMode.None;

//
// If the default PNRP service is not available then
// fall back to a custom peer resolver.
//
if (!NetPeerTcpBinding.IsPnrpAvailable)
{
	binding.Resolver.Custom.Address = new EndpointAddress("net.tcp://resolverUri");
	binding.Resolver.Custom.Binding = new NetTcpBinding(SecurityMode.None, true);
	binding.Resolver.Custom.Resolver = new CustomPeerResolver();
	binding.Resolver.Mode = PeerResolverMode.Custom;
}

Overall, I’m quite impressed with the flexibility of WCF and the usability that seemingly went into the consumer API surface. Hopefully, the documentation catches up to the bits.

Peer-to-Peer (P2P)

“In a forest a tree will fade; from a forest a tree is made.”

–Unknown

Over the past several years I’ve focused on application of P2P algorithms for the commercial space. Anyway, I thought I’d share some of my thoughts and tertiary research.

First, what is Peer-To-Peer (P2P)?  P2P [URI] is a decentralized, fault tolerant, self-organizing system architecture comprised of many unreliable and heterogeneous nodes operating in a functionally symmetric manner—frequent joins and leaves are the norm. P2P is not a client / server architecture. P2P is a paradigm shift from coordination to cooperation, from centralization to decentralization, and from control to incentives.

Algorithms

To my knowledge, the Plaxton mesh (1997) was the earliest known proposal for a scalable P2P network. One major problem with a Plaxton mesh is its static—there is no concept of node arrival, departure, or failure. Essentially, nodes act as routers, clients, and servers simultaneously. A Plaxton mesh routes a message to the nodes whose name is numerically closest to the destination.

From 2001 through 2003, P2P algorithms such as Chord, CAN, Pastry, Tapestry, Viceroy, P-Grid, Kademlia, Koorde, SkipGraph, and SkipNet appeared.

Applications

Napster (www.napster.com) was one of the first P2P applications that appeared in 1999—though it’s not really a P2P network. With Napster, a single central server kept the directory of nodes and objects, which made is essentially a large file catalog—the discovery and routing algorithms are not P2P.

Gnutella first appeared in 2000 and then again in 2002. Though a pure P2P application, Gnutella generated far too much search traffic with a [broadcast-based] O(n) complexity—not very scalable.

eDonkey appeared in 2001 and presented a slight architectural improvement over Napster in that no single centralized server held the file catalog. Nevertheless, eDonkey was not a pure P2P application since it relied on status node structures.

Kazaa also appeared in late 2001 but I haven’t studied its architecture at all. I listed it here for completeness.

Gnutella-2 and BitTorrent were the P2P applications of 2002.  In 2003, Skype, Steam, and PS3 appeared.

Architecture

Common characteristics of P2P architectures are:

  • No central servers.
  • High level of scalability. Discovery and routing is O(log(n)). Routing table size is also O(log(n)).
  • Highly resilient.
  • Dynamic adaptability to node arrival, departure and failure.

Because of these characteristics, P2P architectures offer challenges in many areas. The most visible of the challenges facing designers of P2P routing algorithms are:

  • Scalability. Scalability is a measure of how a system performs when the number of nodes and/or number of messages on the network grows.
  • Computational Complexity. Computational complexity is the measure of the order of steps required for a packet to travel from one host to another in a worst case scenario.
  • Anonymity. Anonymity is not a requirement of most P2P networks, however if a network is to be designed to provide anonymity then this is a problem that must be solved at the routing level.

Topology

A P2P infrastructure is best described as an overlay network—usually on top of an IP network.  In many ways, you can think of P2P as an application-level router providing services that underlying layers do not (i.e. IP multicast).  Generally speaking, P2P architectures employ one of the following topological structural designs:

  1. Centralized (Napster)
  2. Decentralized
    1. Unstructured (Gnutella)
    2. Structured (Chord)
  3. Hierarchical (Multicast Backbone (MBone))
  4. Hybrid (eDonkey)

Different P2P algorithms employ different topological structures. For example, Chord employs a ring topology whereas Kademlia employs a tree and CAN a hypercube.

Routing

Different topologies require different algorithm tactics and strategies. Routing within P2P architectures are very problematic.

Within P2P architectures, algorithms such as flooding, replication and caching, random walkers and probabilistic algorithms, super-peers, Time to Live (TTL), epidemic and gossip protocols, propagation, and dampening algorithms are widely used. These algorithms work well depending on the size of the P2P network. For example, flooding [http://www9.limewire.com/developer/gnutella_protocol_0.4.pdf] is more suited for a small to medium size networks but it has been shown that the cost of searching on a Gnutella style network increases super-linearly as the number of nodes increases.

Distributed Hash Tables (DHT)

DHTs are a class of decentralized distributed systems that partition ownership of a set of keys among participating nodes, and can efficiently route messages to the unique owner of any given key. A hash function (such as SHA-1) accepts a variable length string of bytes and returns a one-way hash. The physical nodes are the hash buckets. Chord is a good example of a DHT algorithm.

Incidentally, in general DHT algorithms will always beat flooding algorithms in the area of routing.

One advantage of the Chord DHT algorithm is that it guarantees receipt of a reply with log(n) time, which is a far better guarantee in terms of computational and time complexity than that offered by flooding techniques.  That said, the Kademlia DHT algorithm does offer advantages over Chord depending on application requirements.

However, DHTs are not without problem:

  1. Routing state maintenance algorithms generate overhead.
  2. They do not work well when lots of nodes join and leave frequently (churn).
  3. Their structured topology and routing algorithms make them frail and vulnerable to security attacks.
  4. Network locality problem: Nodes numerically-close are not topologically-close.

Strategies do exist to address these problems. Regardless, structured DHT algorithms provide the most interesting P2P infrastructures.

Semantic Routing

This is a relatively new area of routing as related to P2P infrastructures. Semantic Routing is a method of routing which is more focused on the nature of the query to be routed than the network topology. Essentially, semantic routing improves on traditional routing by prioritizing nodes which have been previously good at providing information about the types of content referred to by the query.

Semantic routing differs fundamentally from other routing techniques because prospective nodes are selected because of another node’s confidence in their ability to respond correctly to a given query irrespective of their position within the network. Neurogrid and Remindin are of the most interesting semantic routing systems with the Remindin system incorporating a semantic routing algorithm developed with the intention of mimicking social networks.

Summary

Overall, the most interesting of the DHT algorithms are Chord, Kademlia, and XRing.  I hope to release .NET implementations of all three of these algorithms (and a few variations) in the near future. You’ll find them here.

Fiefdoms: Cause and Effect

I ran across a very good book titled The Fiefdom Syndrome, which discusses the cause and effect of fiefdoms.

“The fiefdom syndrome stems from the inclination of managers and employees to become fixated on their own activities, their own careers, their own territory or turf to the detriment of those around them.

People who create fiefdoms can become dangerously insular, losing perspective on what is happening in the world outside their own control. They also lose their ability to act consistently on behalf of the greater good, or in a way that enhances the effectiveness of the larger organization. They often resist new situations and change.

People who create fiefdoms tend to hoard resources. They are determined to do things their own way, often duplicating or complicating what should be streamlined throughout the company, leading to runaway costs, increased bureaucracy and slower response times.

Organizations infected with fiefdoms tend to kill off or stifle individual creativity, leading to what I call the ‘freeze factor’: when organizations become frozen or stuck in place, letting competitors pass them by.”

Information Modeling

When defining an information model, should one favor an abstract or concrete design? The short answer is neither.

Generally speaking, the life of any distributed system directly relates to its level of entropy. Furthermore, the level of entropy in a system directly relates to the system’s computational complexity. When designing applications to solve business problems, information storage and retrieval is one of the more important foundational design points. If the information schema is designed without efficiency, scalability, and flexibility in mind then not only will the system perform poorly but it will not meet the requirements imposed by future business demands.

At either end of the spectrum, an information schema is categorized as either concrete or abstract—though there most certainly are varying degrees in between. Each design has advantages and disadvantages. A concrete design is easier to conceptualize and requires less complex algorithmic considerations. In most cases, a concrete design is far less flexible than that of an abstract design. A properly designed abstract information schema remains invariant in the face of change that would otherwise impose a relatively large amount of downtime for an equivalent concrete model.

Arguably, however, many abstract systems are indeed designed with concreteness in mind. By this I mean the system is designed to contain representations of concrete constructs. Why? It’s how humans think in every day life. If I ask you to describe yourself then you’re statistically more likely to forgo attributes such as “person” or “human” and instead provide attributes such as height and weight. The former attributes are more abstract (relatively speaking) than the latter attributes. However, both indeed describe a concrete construct: you.

I tend to favor abstract information models but then again, I spend the majority of my time in abstract problem domains. However, regardless of which classification the solution domain requires, the following rules and considerations have proven effective in designing an efficient information model:

  1. Effectively understand the information.
  2. Describe it unambiguously.
  3. Enforce structure and style guidelines.
  4. Allow for efficient storage and retrieval of information.
  5. Keep network communication to a minimum. Don’t over engineer.

The last bullet (5) is delivered with a caveat. Over engineering is a very subjective quantification that takes time to perfect and is directly related to your complete understanding of the information (bullet 1). If bullet 1 suffers then so does bullet 5.

Usually, the questions are quick. It’s the answers that take the time.