Realtime web application using CQRS and websockets

I’ve started to put together a sample application having an architecture looking like this:

There’s an ASP.Net MVC application that only serves GET requests. Each rendered view then communicates to a web socket server using web sockets. The client issues commands and listens/reacts to business events triggered by the domain executed at the application server. The commands reaches the application server using a service bus (MassTransit). The events generated by the aggregate roots in the domain are alos put on the bus and thereby will be delivered back to the web socket server which notifies the correct sender. All events are stored in an event store. The events are also denormalized to customer read models (using SisoDb), which could be seen as a data model that is designed to target a specific scenario, with e.g precalculated aggregated values etc. The read models are written to one data base which then is replicated to one serving reads in the web application and one serving reads in the web socket server.

I’ll keep writing about this and when time comes, it will be open sourced at GitHub.

//Daniel

LiteCQRS

I have been taking baby steps with CQRS lately in a enterprise application (both in web and WPF domains) and after doing this a while, using a fork of an existing CQRS “framework”, I just had to sit down and put something together myself. Just for fun. You should not see my project as a silverbullet or out of the box solution ready for tackling every problem for you. What you could use it for, is perhaps some inspiration, or a good laugh.

For a better reading experience…, you should have a look here: https://github.com/danielwertheim/LiteCQRS/wiki/Overview

LiteCQRS is a small conventionbased and CQRS inspired project. It differs a bit by not having interfaces for command handlers and event handlers. Instead it looks at namespaces and naming of classes and then the public method signatures. You can of course hook into this and determine the namespace and class names yourself.

NOTE!

You should not look at LiteCQRS as an complete out of the box solution that will take care of every problem for you. E.g. there are no other event stores than an in-memory. There also just a simple synchronous event publisher; The reason for this is that it is a code base you could fork and make behave as you want. E.g use Jonathan Olivers Event store etc.

Quick start

The sample will be in a domain (TickTack) dealing with time registration. By default the namespaces and class names under the assembly/assemblies being passed to the CQRS-runtime, should look like this.

TickTack.CommandHandlers

Any public method taking one argument that extends ICommand is treated as a command-handler.

namespace TickTack.CommandHandlers
{
    public class ProjectCommands
    {
        private readonly IDomainRepository _domainRepository;

        public ProjectCommands(IDomainRepository domainRepository)
        {
            _domainRepository = domainRepository;
        }

        public void StartNewProject(StartNewProject command)
        {
            var project = new Project(command.ProjectId);

            _domainRepository.Store(project);
        }

        public void RegisterActivity(RegisterActivity command)
        {
            var project = _domainRepository.GetById<Project>(command.ProjectId);

            project.RegisterActivity(command.Activity);

            _domainRepository.Store(project);
        }
    }
}

TickTack.EventHandlers

Any public method taking one argument that extends IEvent is treated as an event-handler.

namespace TickTack.EventHandlers
{
    public class ProjectEvents
    {
        public void ProjectStarted(ProjectStarted e)
        {
            Console.WriteLine("Project started");
        }

        public void ActivityRegistered(ActivityRegistered e)
        {
            Console.WriteLine("Activity registrered");
        }
    }
}

TickTack.Commands

namespace TickTack.Commands
{
    [Serializable]
    public class StartNewProject : ICommand
    {
        public Guid ProjectId { get; private set; }

        public string Name { get; private set; }

        public StartNewProject(Guid projectId, string name)
        {
            Ensure.That(projectId, "projectId").IsNotEmpty();
            Ensure.That(name, "name").IsNotNullOrWhiteSpace();

            ProjectId = projectId;
            Name = name;
        }
    }

    [Serializable]
    public class RegisterActivity : ICommand
    {
        public Guid ProjectId { get; private set; }

        public Activity Activity { get; private set; }

        public RegisterActivity(Guid projectId, Activity activity)
        {
            Ensure.That(projectId, "projectId").IsNotEmpty();
            Ensure.That(activity, "activity").IsNotNull();

            ProjectId = projectId;
            Activity = activity;
        }
    }
}

TickTack.Events

namespace TickTack.Events
{
    public class ProjectStarted : IEvent
    {
        public Guid AggregateRootId { get; private set; }

        public ProjectStarted(Guid cartId)
        {
            AggregateRootId = cartId;
        }
    }

    public class ActivityRegistered : IEvent
    {
        public Guid AggregateRootId { get; private set; }

        public Activity Activity { get; private set; }

        public ActivityRegistered(Guid projectId, Activity activity)
        {
            AggregateRootId = projectId;
            Activity = activity;
        }
    }
}

TickTack.Domain

An aggregate root should extend AggregateRoot. State should be applied in handlers in the aggregate: protected virtual void OnProjectStarted(ProjectStarted e). You raise the event by passing an event to Raise.

namespace TickTack.Domain
{
    public class Project : AggregateRoot
    {
        private readonly List<Activity> _activities;

        private Project()
        {
            _activities = new List<Activity>();
        }

        public Project(Guid id)
            : this()
        {
            Raise(new ProjectStarted(id));
        }

        public void RegisterActivity(Activity activity)
        {
            Raise(new ActivityRegistered(Id, activity));
        }

        protected virtual void OnProjectStarted(ProjectStarted e)
        {
            Id = e.AggregateRootId;
        }

        protected virtual void OnActivityRegistered(ActivityRegistered e)
        {
            _activities.Add(e.Activity);
        }
    }

    public class Activity
    {
        public string Name { get; private set; }

        public DateTime Date { get; private set; }

        public int Minutes { get; private set; }

        public Activity(string name, DateTime date, int minutes)
        {
            Ensure.That(name, "name").IsNotNullOrWhiteSpace();
            Ensure.That(date, "date").IsInRange(
                DateTime.Now.Date.AddDays(-date.Day),
                DateTime.Now.Date);
            Ensure.That(minutes, "minutes").IsInRange(0, 1440);

            Name = name;
            Date = date;
            Minutes = minutes;
        }
    }
}

The CQRS Runtime

The two most important things you need to to is to tell the CqrsRuntimeBuilder how it should create command handlers and event handlers. This is easiest done by assigning a Func that acts like a factory. The example below just makes use of the IoC since we want to inject dependencies to our handlers via the IoC.

The rest of the config, I hope is self-explanatory.

AssemblyScanConfig

This is use to scan for command handlers and event handlers and there’s a Namespace filter Func on it that you can use to controll the namespace location of e.g CommandHandlers.

public class ApiCqrsRuntimeConfig : NinjectModule
{
	public override void Load()
	{
		//If the Funcs for factories scares you, 
		//You could also Inherit and override virtual methods to control creation
		//of resources
		var cqrsRuntimeBuilder = new CqrsRuntimeBuilder
		{
			CommandHandlerContainerFactory = t => Kernel.Get(t), 
			EventHandlerContainerFactory = t => Kernel.Get(t)
		};

		//At least one assembly needs to be scanned for command handlers
		var commandHandlersConfig = new AssemblyScanConfig(typeof(CommandHandlers.ProjectCommands).Assembly);
			
		//At least one assembly needs to be scanned for event handlers
		var eventHandlersConfig = new AssemblyScanConfig(typeof(EventHandlers.ProjectEvents).Assembly);

		//Build runtime
		var cqrsRuntime = cqrsRuntimeBuilder.Build(new[] { commandHandlersConfig }, new[] { eventHandlersConfig });

		//Register runtime parts so that they can be injected in API context.
		Kernel.Bind<ICommandBus>()
                    .ToMethod(ctx => cqrsRuntime.CommandBus);
		Kernel.Bind<IEventApplier>()
                    .ToMethod(ctx => cqrsRuntime.EventApplier);
		Kernel.Bind<IEventPublisher>()
                    .ToMethod(ctx => cqrsRuntime.EventPublisher);
		Kernel.Bind<IEventStore>()
                    .ToMethod(ctx => cqrsRuntime.EventStore);
		Kernel.Bind<IDomainRepository>()
                    .ToMethod(ctx => cqrsRuntime.GetDomainRepository());
	}
}