Software Design Patterns

Florian Rappl, Fakultät für Physik, Universität Regensburg

Software Design Patterns

Introduction to modern software architecture

software architecture

Concurrency patterns

Introduction

  • They deal with the multi-threaded programming paradigm
  • However, in general they describe asynchronous processes
  • This does not necessary mean multi-threaded
  • Usually one does not need threads for async behavior
  • Goal: Reduce communication overhead, while increasing flexibility
  • Manage program resources more efficiently

Threading overview

threading.png

Problems and primitives

  • Multi-threading might end in race conditions
  • A race-condition is when two or more threads want to access a specific resource at the same time
  • Also memory inconsistencies like instruction re-ordering or only partially constructed objects might arise
  • To cover this we can use Mutex objects
  • These are mutually exclusive and the basis for e.g. barriers, ...

The scheduler pattern

  • Old problem: How to synchronize access?
  • Special case: Limit access to a specific resource to a single thread
  • The result is that the resource will always be accessed synchronized
  • Use-case: Synchronized access to the elements in a GUI
  • The scheduler pattern provides the basis for sequencing waiting threads
  • It is similar to the mediator pattern, with the various threads being mediated

Scheduler diagram

skinparam classAttributeIconSize 0
interface Scheduler {
  dispatch()
  enqueue(Worker)
}
interface Worker {
  run()
}
class ConcreteScheduler implements Scheduler {
  +dispatch()
  +enqueue(Worker)
}
class ThreadScheduler {
  -scheduler: Scheduler
  -thread: Thread
  +run()
}
ThreadScheduler .> Scheduler 
Scheduler -> Worker

Remarks

  • Usually a mechanism to implement a scheduling policy is provided
  • This mechanism is defined in an abstract class
  • The pattern is independent of a specific scheduling policy
  • This policy might be FIFO, FILO, priority queue, ... or customized
  • This pattern adds some overhead beyond a barrier or similar locking techniques

Sample implementation

interface IWorker
{
	void Run();
}

interface IScheduler
{
	void Dispatch();
	void Enqueue(IWorker worker);
}

class ThreadScheduler
{
	IScheduler scheduler;
	Thread thread;
	
	public ThreadScheduler(IScheduler scheduler)
	{
		this.scheduler = scheduler;
		this.thread = new Thread(Loop);
	}
	
	public void Run()
	{
		thread.Start();
	}
	
	void Loop()
	{
		while (true)
		{
			scheduler.Dispatch();
			Thread.Sleep(10);
		}
	}
}

class QueueScheduler : IScheduler
{
	Queue<IWorker> queue;
	Object key;
	
	public QueueScheduler()
	{
		queue = new Queue<IWorker>();
		key = new Object();
	}
	
	public void Dispatch()
	{
		IWorker process;
	
		lock (key)
		{
			if (queue.Count == 0)
				return;
				
			process = queue.Dequeue();
		}
		
		process.Run();
	}
	
	public void Enqueue(IWorker worker)
	{
		lock (key)
		{
			queue.Enqueue(worker);
		}
	}
}

class SleepyWorker : IWorker
{
	public void Run()
	{
		Console.WriteLine("Sleeping 1s...");
		Thread.Sleep(1000);
		Console.WriteLine("Finished sleeping!");
	}
}

class WorkingWorker : IWorker
{
	public void Run()
	{
		Console.WriteLine("Doing hard work for 2s...");
		Thread.Sleep(2000);
		Console.WriteLine("Finished doing work!");
	}
}
interface Worker {
	void run();
}

interface Scheduler {
	void dispatch();
	void enqueue(Worker worker);
}

class ThreadScheduler {
	Scheduler scheduler;
	Thread thread;
	
	public ThreadScheduler(Scheduler scheduler) {
		this.scheduler = scheduler;
		this.thread = new Thread(new Loop());
	}
	
	public void run() {
		thread.start();
	}
	
	class Loop implements Runnable {
		public void run() {
			while (true) {
				scheduler.dispatch();
				Thread.sleep(10);
			}
		}
	}
}

class QueueScheduler implements Scheduler {
	Queue<Worker> queue;
	Object key;
	
	public QueueScheduler() {
		queue = new LinkedList<Worker>();
		key = new Object();
	}
	
	public void dispatch() {
		Worker process = null;
	
		synchronized (key) {
			process = queue.poll();
		}

		if (process == null)
			return;
		
		process.run();
	}
	
	public void enqueue(Worker worker) {
		synchronized (key) {
			queue.add(worker);
		}
	}
}

class SleepyWorker implements IWorker {
	public void run() {
		out.println("Sleeping 1s...");
		Thread.sleep(1000);
		out.println("Finished sleeping!");
	}
}

class WorkingWorker implements IWorker {
	public void run() {
		out.println("Doing hard work for 2s...");
		Thread.sleep(2000);
		out.println("Finished doing work!");
	}
}
class Worker {
public:
	virtual void Run() = 0;
};

class Scheduler {
public:
	virtual void Dispatch() = 0;
	virtual void Enqueue(Worker *worker) = 0;
};

class ThreadScheduler {
private:
	thread *t;
	
	static void Loop(Scheduler *scheduler) {
		chrono::milliseconds ten(10);

		while (true) {
			scheduler->Dispatch();
			this_thread::sleep_for(ten);
		}
	}
	
public:
	ThreadScheduler(Scheduler &scheduler) {
		t = new thread(Loop, &scheduler);
	}
	
	void Run() {
		t->detach();
	}
};

class QueueScheduler : public Scheduler {
private:
	queue<Worker*> *workers;
	mutex key;
	
public:
	QueueScheduler() {
		workers = new queue<Worker*>();
	}
	
	void Dispatch() {
		Worker *process = NULL;
		key.lock();

		if (workers->empty())
			return;
			
		process = workers->front();
		workers->pop();
		key.unlock();		
		process->Run();
	}
	
	void Enqueue(Worker *worker) {
		key.lock();
		workers->push(worker);
		key.unlock();
	}
};

class SleepyWorker : public Worker {
public:
	void Run() {
		chrono::seconds one(1);
		cout << "Sleeping 1s..." << endl;
		this_thread::sleep_for(one);
		cout << "Finished sleeping!" << endl;
	}
};

class WorkingWorker : public Worker {
public:
	void Run() {
		chrono::seconds two(2);
		cout << "Doing hard work for 2s..." << endl;
		this_thread::sleep_for(two);
		cout << "Finished doing work!" << endl;
	}
};

Practical considerations

  • The read/write lock pattern can be implemented using the scheduler pattern
  • The .NET TPL uses this pattern to provide a scheduler for tasks
  • Also Intel TBB offers a task scheduler, which can also be used to accommodate a given worker size
  • In general schedulers might be used for various scenarios, not only if a certain task needs to be executed single-threaded

The monitor pattern

  • Synchronization is important and sometimes a certain condition has to be fulfilled for allowing thread(s) to continue
  • The monitor object pattern allows both:
    1. mutual exclusion
    2. ability to wait for a certain condition
  • Therefore this pattern is thread-safe and condition-driven
  • Additionally we might want to inform a thread about condition-changes

Monitor diagram

skinparam classAttributeIconSize 0

class Monitor {
  +syncMethod1()
  +syncMethodN()
}
class MonitorCondition {
  +wait()
  +notify()
}
class MonitorLock {
  +acquire()
  +release()
}
Monitor *--> "1..*" MonitorCondition
Monitor *--> MonitorLock
Client "2..*" .> Monitor
MonitorCondition -> MonitorLock

Remarks

  • A condition variable consists of threads that are waiting on a certain condition to be fulfilled
  • Monitors provide a mechanism for threads to temporarily give up exclusive access
  • This is done in order to wait for some condition to be met
  • After the condition is met the exclusive access is regained
  • Their task is then continued (threads are resumed)

Sample implementation

class MonitorLock
{
}

class MonitorCondition
{
	MonitorLock mlock;
	Queue<Thread> threads;

	public MonitorCondition(MonitorLock _lock)
	{
		threads = new Queue<Thread>();
		mlock = _lock;
	}

	public void Wait()
	{
		var willSleep = false;

		lock (mlock)
		{
			willSleep = threads.Count > 0;
			threads.Enqueue(Thread.CurrentThread);
		}
		
		if (willSleep)
		{
			try { Thread.Sleep(Timeout.Infinite); }
			catch (ThreadInterruptedException) { }
		}
	}
	
	public void Notify()
	{
		var willInterrupt = false;

		lock (mlock)
		{
			willInterrupt = threads.Count > 0;
			threads.Dequeue();
		}
		
		if (willInterrupt)
			threads.Peek().Interrupt();		
	}
}

class Monitor
{
	MonitorLock mlock;
	MonitorCondition mcondition;

	public Monitor()
	{
		mlock = new MonitorLock();
		mcondition = new MonitorCondition(mlock);
	}

	public void Tick()
	{
		mcondition.Wait();
		Thread.Sleep(1000);
		Console.WriteLine("Tick");
		mcondition.Notify();
	}
	
	public void Tock()
	{
		mcondition.Wait();
		Thread.Sleep(1000);
		Console.WriteLine("Tock");
		mcondition.Notify();
	}
}
class MonitorLock {
}

class MonitorCondition {
	MonitorLock mlock;
	Queue<Thread> threads;

	public MonitorCondition(MonitorLock _lock) {
		threads = new LinkedList<Thread>();
		mlock = _lock;
	}

	public void wait() {

		synchronize (_lock) {
			threads.add(Thread.currentThread());

			if (threads.size() > 0)
				_lock.wait();
		}
	}
	
	public void notify() {
		bool willInterrupt = false;

		synchronize (_lock) {
			threads.remove();
			willInterrupt = threads.size() > 0;
		}
		
		if (willInterrupt)
			_lock.notify();	
	}
}

class Monitor {
	MonitorLock mlock;
	MonitorCondition mcondition;

	public Monitor() {
		mlock = new MonitorLock();
		mcondition = new MonitorCondition(mlock);
	}

	public void tick() {
		mcondition.wait();
		Thread.sleep(1000);
		out.println("Tick");
		mcondition.notify();
	}
	
	public void tock() {
		mcondition.wait();
		Thread.sleep(1000);
		out.println("Tock");
		mcondition.notify();
	}
}
class MonitorLock {
private:
	mutex m;

public:
	void Acquire() {
		m.lock();
	}
	
	void Release() {
		m.unlock();
	}
};

class MonitorCondition {
private:
	MonitorLock* mlock;
	queue<volatile bool*> threads;

public:
	MonitorCondition(MonitorLock* _lock) : mlock(_lock) {
	}

	void Wait() {
		mlock->Acquire();
		bool willSleep = threads.size() > 0;
		volatile bool* sleepRef = &willSleep;
		threads.push(sleepRef);
		mlock->Release();
		
		while (*sleepRef) {
			chrono::milliseconds wait(100);
			this_thread::sleep_for(wait);
		}
	}
	
	void Notify() {
		mlock->Acquire();
		threads.pop();
		bool willInterrupt = threads.size() > 0;
		mlock->Release();
		
		if (willInterrupt)
			*(threads.front()) = false;	
	}
};

class Monitor {
private:
	MonitorLock* mlock;
	MonitorCondition* mcondition;

public:
	Monitor() {
		mlock = new MonitorLock();
		mcondition = new MonitorCondition(mlock);
	}

	void Tick() {
		chrono::seconds second(1);
		mcondition->Wait();
		this_thread::sleep_for(second);
		cout << "Tick" << endl;
		mcondition->Notify();
	}
	
	void Tock() {
		chrono::seconds second(1);
		mcondition->Wait();
		this_thread::sleep_for(second);
		cout << "Tock" << endl;
		mcondition->Notify();
	}
};

Practical considerations

  • Each condition represents a waiting room
  • The Notify method is used to wake up a waiting process
  • In real-life this works like a fast food restaurant
  • Separate concerns and protect object state from uncontrolled changes
  • Objects should be responsible for ensuring that any of their methods that require synchronization are serialized transparently
  • Conditions act as a scheduler (in fact a scheduler might be considered)
  • Be aware of the nested monitor lockout (no nesting!)

Sample monitor sequence

participant Client1
participant Client2
participant Monitor
participant MonitorLock
participant MonitorCondition
activate Client1
Client1 -> Monitor: syncMethod1()
activate Monitor
Monitor -> MonitorLock: acquire()
Monitor -> Monitor: run()
activate Monitor
deactivate Monitor
Monitor -> MonitorCondition: wait()
deactivate Monitor
activate MonitorCondition
MonitorCondition -> MonitorLock: release()
MonitorCondition -> Client1: suspendThread()
deactivate MonitorCondition
Client2 -> Monitor: syncMethod2()
activate Client2
activate Monitor
Monitor -> MonitorLock: acquire()
Monitor -> Monitor: run()
activate Monitor
deactivate Monitor
Monitor -> MonitorCondition: notify()
activate MonitorCondition
Monitor -> MonitorLock: release()
deactivate Monitor
deactivate Client2
MonitorCondition -> Client1: resumeThread()
deactivate MonitorCondition
Client1 -> Monitor: syncMethod1()
activate Monitor
Monitor -> MonitorLock: acquire()
Monitor -> Monitor: run()
activate Monitor
deactivate Monitor
Monitor -> MonitorLock: release()

The thread pool pattern

  • Usually we don't want to create an arbitrary number of threads
  • Instead we want a maximum number of threads to handle open tasks
  • The solution is to use a thread pool, which recycles and limits threads
  • Idea: Obtain threads faster, reduce used resources, optimal usage
  • A thread pool consists of a task queue and a pool of running workers
  • If a thread pool is available we should definitely prefer it to plain threads

Thread pool diagram

skinparam classAttributeIconSize 0
interface Worker {
  prepare()
  run()
}
class WorkerThread implements Worker {
  +prepare()
  +run()
}
interface WorkUnit {
  process()
}
class ConcreteWorkUnit implements WorkUnit {
  +process()
}
class ThreadPool {
  +initialize()
  +queue()
  +getStatus()
  +shutdown()
}
WorkerThread -> WorkUnit
ThreadPool o-> Worker

Remarks

  • The thread pool pattern makes most sense with the task concept
  • A task is a wrapper around a method call that could run concurrently
  • In general we have more tasks than threads, i.e. tasks are executed by threads
  • As soon as a thread completes its task, it will request the next task from the queue until all tasks have been completed
  • The thread can then terminate, or sleep until there are new tasks available

Sample implementation

public interface IWorkUnit
{
    void Process();
}

public interface IWorker
{
    void Prepare();
    void Run();
    void Close();
}

public class WorkerThread : IWorker
{
    Queue<IWorkUnit> queue;
    Thread thread;

    public WorkerThread(Queue<IWorkUnit> queue)
    {
        this.queue = queue;
    }

    public void Prepare()
    {
        thread = new Thread(Loop);
    }

    public void Run()
    {
        thread.Start();
    }

    public void Close()
    {
        thread.Abort();
    }

    void Loop()
    {
        while (true)
        {
            IWorkUnit item = null;
            
            lock(queue)
            {
                if (queue.Count > 0)
                    item = queue.Dequeue();
            }
            
            if (item != null)
                item.Process();
            else
                Thread.Sleep(100);
        }
    }
}

public class ThreadPool
{
    private readonly int nThreads;
    private readonly IWorker[] threads;
    private readonly Queue<IWorkUnit> queue;

    public ThreadPool(int nThreads)
    {
        this.nThreads = nThreads;
        this.queue = new Queue<IWorkUnit>();
        this.threads = new IWorker[nThreads];
    }

    public void Initialize()
    {
        for (int i = 0; i < nThreads; i++)
        {
            threads[i] = new WorkerThread(queue);
            threads[i].Prepare();
            threads[i].Run();
        }
    }

    public void Shutdown()
    {
        for (int i = 0; i < nThreads; i++)
        {
            threads[i].Close();
        }
    }

    public void Queue(IWorkUnit item)
    {
        lock(queue)
        {
            queue.Enqueue(item);
        }
    }
}
public interface WorkUnit {
    void process();
}

public class WorkerThread extends Thread {
    LinkedList<WorkUnit> queue;

    public WorkerThread(LinkedList<WorkUnit> queue) {
        this.queue = queue;
    }

    public void run() {
        WorkUnit item;

        while (true) {
            synchronized(queue) {
                while (queue.isEmpty()) {
                    try { queue.wait(); }
                    catch (InterruptedException ignored) { }
                }

                item = queue.removeFirst();
            }

            try {
                item.process();
            } catch (RuntimeException e) {
                /* ... */
            }
        }
    }
}

public class ThreadPool {
    private final int nThreads;
    private final Thread[] threads;
    private final LinkedList<WorkUnit> queue;

    public ThreadPool(int nThreads) {
        this.nThreads = nThreads;
        this.queue = new LinkedList<WorkUnit>();
        this.threads = new Thread[nThreads];
    }

    public void initialize() {
        for (int i = 0; i < nThreads; i++) {
            threads[i] = new WorkerThread(queue);
            threads[i].start();
        }
    }

    public void shutdown() {
        for (int i = 0; i < nThreads; i++) {
            threads[i].interrupt();
            threads[i].shutdown();
        }
    }

    public void queue(WorkUnit item) {
        synchronized(queue) {
            queue.addLast(item);
            queue.notify();
        }
    }
}
class WorkUnit {
public:
    virtual void Process() = 0;
};

class Worker {
public:
    virtual void Prepare() = 0;
    virtual void Run() = 0;
};

class WorkerThread : public Worker {
private:
    queue<WorkUnit*>* workers;
    thread* worker;
    mutex* key;

    void Loop() {
        chrono::milliseconds wait(100);

        while (true) {
            WorkUnit* item = NULL;
            key->lock();

            if (workers->empty() == false) {
                item = workers->front();
                workers->pop();
            }

            key->unlock();
            
            if (item != NULL)
                item->Process();
            else
                this_thread::sleep_for(wait);
        }
    }
 
public:
    WorkerThread(queue<WorkUnit*> &workers, mutex &key) {
        this->workers = &workers;
        this->key = &key;
    }

    void Prepare() {
        worker = new thread(&WorkerThread::Loop, this);
    }

    void Run() {
        worker->detach();
    }
};

class ThreadPool {
private:
    const int nThreads;

    Worker** threads;
    queue<WorkUnit*> workers;
    mutex key;

public:
    ThreadPool(int nThreads) : nThreads(nThreads) {
        this->threads = new Worker*[nThreads];
    }

    void Initialize() {
        for (int i = 0; i < nThreads; i++) {
            threads[i] = new WorkerThread(workers, key);
            threads[i]->Prepare();
            threads[i]->Run();
        }
    }

    void Queue(WorkUnit* item) {
        key.lock();
        workers.push(item);
        key.unlock();
    }
};

Practical considerations

  • In .NET we could use the ThreadPool class
  • However, for some problems our own thread pool might be useful
  • Examples: Long running tasks, sleeping tasks with frequent polling
  • The WorkUnit interface symbolizes a proper work item
  • This allows our own thread pool to return results after completion
  • Also we might implement dependencies from one WorkUnit to another
  • This concept is implemented in .NET with the TPL

Advantages of multi-threading

multithread.png

The active object pattern

  • The AO pattern decouples method execution from method invocation for an object
  • The invocation should occur in the client's thread
  • The execution in the AO thread of control
  • The design should make this look transparent (using the proxy pattern)

Active object components

  • Proxy Provides an interface to the client
  • Worker base class with one class for each method of the proxy
  • Activation queue that contains the requested invocations
  • Scheduler that picks the request to be processed next
  • Servant that processes the requests
  • Future that contains the response

Active object diagram

skinparam classAttributeIconSize 0
class Proxy {
  +runWorker1()
  +runWorkerN()
}
class Scheduler {
  +dispatch()
  +queue()
}
class ActivationQueue {
  +enqueue()
  +dequeue()
}
abstract class Worker {
  +{abstract} canRun(): Boolean
  +{abstract} run()
}
class Servant {
  +runWorker1()
  +runWorkerN()
}
Client .> Future : "result"
Client .-.-> Proxy : "invoke"
Worker .-.-> Future : "modify"
Worker <-o Scheduler
Proxy "1" - "1" Scheduler
Proxy ..> Future : "instantiate"
Proxy ..> Worker : "instantiate"
Scheduler "1" -- "1" ActivationQueue
ActivationQueue "1" --> "*" Worker : "maintain"
Worker ..> Servant : "execute"
class ConcreteWorker1 extends Worker {
  +canRun(): Boolean
  +run()
}
class ConcreteWorkerN extends Worker {
  +canRun(): Boolean
  +run()
}

Remarks

  • The proxy transforms each request into an instance of a method class
  • Possible parameters are stored / used for object creation
  • It also creates the appropriate future for storing the result
  • The future is initially empty and also has a status field
  • The method object has a reference to the future
  • The activation queue can be implemented using the Monitor pattern
  • The servant has the same interface as the proxy and implements the actual methods

Sample implementation

class Future
{
	public bool IsFinished
	{
		get;
		private set;
	}
	
	public object Result
	{
		get;
		private set;
	}
	
	public void SetResult(object value)
	{
		if (IsFinished)
			return;
			
		IsFinished = true;
		Result = value;
	}
}

interface IWorker
{
	bool CanRun();
	
	void Run();
	
	Future Result { get; }
}

class Servant
{
	public double DoWork()
	{
		var sw = Stopwatch.StartNew();
		Console.WriteLine("I am now running ...");
		Thread.Sleep(1000);
		sw.Stop();
		return sw.ElapsedMilliseconds;
	}
}

class LongWorker : IWorker
{
	Servant servant;
	Future result;
	DateTime ahead;
	
	public LongWorker()
	{
		this.servant = new Servant();
		this.result = new Future();
		this.ahead = DateTime.Now.AddSeconds(10);
	}
	
	public Future Result 
	{ 
		get { return result; } 
	}

	public bool CanRun()
	{
		return DateTime.Now.CompareTo(ahead) >= 0;
	}
	
	public void Run()
	{
		var value = servant.DoWork();
		result.SetResult(value);
	}
}

class ShortWorker : IWorker
{
	Future result;
	
	public ShortWorker()
	{
		this.result = new Future();
	}
	
	public Future Result 
	{ 
		get { return result; } 
	}

	public bool CanRun()
	{
		return true;
	}
	
	public void Run()
	{
		result.SetResult("Short worker finished first!");
	}
}

class Scheduler
{
	List<IWorker> workers;
	static Scheduler current;
	Thread thread;
	object mlock;
	
	private Scheduler()
	{
		mlock = new object();
		workers = new List<IWorker>();
		thread = new Thread(Loop);
		thread.Start();
	}
	
	void Loop()
	{
		while (true)
		{
			Dispatch();
			Thread.Sleep(5);
		}
	}
	
	public static Scheduler Current
	{
		get { return current ?? (current = new Scheduler()); }
	}
	
	public void Stop()
	{
		thread.Interrupt();
		current = null;
	}

	public void Dispatch()
	{
		for (int i = 0; i < workers.Count; i++)
		{
			var worker = workers[i];
				
			if (worker.CanRun())
			{
				lock (mlock) 
				{
					workers.RemoveAt(i);
				}
				
				worker.Run();
				Console.WriteLine("Worker finished!");
				break;
			}
		}
	}
	
	public void Queue(IWorker worker)
	{
		lock (mlock)
		{
			workers.Add(worker);
		}
	}
}

class Proxy
{
	public Future RunLongWorker()
	{
		var w = new LongWorker();
		Scheduler.Current.Queue(w);
		return w.Result;
	}
	
	public Future RunShortWorker()
	{
		var w = new ShortWorker();
		Scheduler.Current.Queue(w);
		return w.Result;
	}
}
class Future {
	private bool finished;
	private object result;

	public bool getIsFinished() {
		return finished;
	}
	
	public object getResult() {
		return result;
	}
	
	public void SetResult(object value) {
		if (finished)
			return;
			
		finished = true;
		result = value;
	}
}

interface Worker {
	bool canRun();
	
	void run();
	
	Future getResult();
}

class Servant {
	public double doWork() {
		var sw = new StopWatch();
		sw.start();
		out.println("I am now running ...");
		Thread.sleep(1000);
		sw.stop();
		return sw.getTotalTimeMillis();
	}
}

class LongWorker implements Worker {
	Servant servant;
	Future result;
	int tries;
	
	public LongWorker() {
		this.servant = new Servant();
		this.result = new Future();
		this.tries = 0;
	}
	
	public Future getResult() { 
		return result; 
	}

	public bool canRun() {
		return tries++ == 10;
	}
	
	public void run() {
		double value = servant.DoWork();
		result.setResult(value);
	}
}

class ShortWorker implements Worker {
	Future result;
	
	public ShortWorker() {
		this.result = new Future();
	}
	
	public Future getResult() { 
		return result; 
	}

	public bool canRun() {
		return true;
	}
	
	public void run() {
		result.setResult("Short worker finished first!");
	}
}

class Scheduler {
	List<Worker> workers;
	static Scheduler current;
	object mlock;
	Thread thread;
	
	private Scheduler() {
		mlock = new object();
		workers = new ArrayList<Worker>();
		thread = new Thread(new Loop());
		thread.start();
	}
	
	public class Loop : Runnable {
		public void Run() {
			while (true) {
				Scheduler.getCurrent().dispatch();
				Thread.sleep(5);
			}
		}
	}
	
	public static Scheduler getCurrent() {
		if (current == null)
			current = new Scheduler();

		return current;
	}
	
	public void stop() {
		thread.interrupt();
		current = null;
	}

	public void dispatch() {
		for (int i = 0; i < workers.size(); i++) {
			Worker worker = workers[i];
				
			if (worker.canRun()) {
				synchronize (mlock) {
					workers.remove(worker);
				}
				
				worker.run();
				out.println("Worker finished!");
				break;
			}
		}
	}
	
	public void queue(Worker worker) {
		synchronize (mlock) {
			workers.add(worker);
		}
	}
}

class Proxy {
	public Future RunLongWorker() {
		LongWorker w = new LongWorker();
		Scheduler.getCurrent().queue(w);
		return w.getResult();
	}
	
	public Future RunShortWorker() {
		ShortWorker w = new ShortWorker();
		Scheduler.getCurrent().queue(w);
		return w.getResult();
	}
}
class Future {
private:
	bool finished;
	string result;

public:
	bool GetIsFinished() { 
		return finished;
	}
	
	string GetResult() { 
		return result;
	}
	
	void SetResult(string value) {
		if (finished)
			return;
			
		finished = true;
		result = value;
	}
};

class Worker {
public:
	virtual bool CanRun() = 0;
	
	virtual void Run() = 0;
	
	virtual Future* GetResult() = 0;
};

class Servant {
public:
	double DoWork() {
		cout << "I am now running ..." << endl;
		this_thread::sleep_for(chrono::seconds(1));
		return 1000.0;
	}
};

class LongWorker : public Worker {
private:
	Servant servant;
	Future* result;
	int trials;
	
public:
	LongWorker() {
		result = new Future();
		trials = 0;
	}
	
	Future* GetResult() { 
		return result;
	}

	bool CanRun() {
		return trials++ == 10;
	}
	
	void Run() {
		double value = servant.DoWork();
		ostringstream strs;
		strs << value;
		result->SetResult(strs.str());
	}
};

class ShortWorker : public Worker {
private:
	Future* result;

public:
	ShortWorker() {
		result = new Future();
	}
	
	Future* GetResult() { 
		return result;
	}

	bool CanRun() {
		return true;
	}
	
	void Run() {
		result->SetResult("Short worker finished first!");
	}
};

class Scheduler {
private:
	static Scheduler* current;
	list<Worker*> workers;
	thread* t;
	mutex m;
	volatile bool active;
	
	Scheduler() {
		active = true;
		t = new thread(&Scheduler::Loop, this);
	}

	Scheduler(Scheduler const&);
	void operator=(Scheduler const&);
	
	void Loop() {
		while (active) {
			Dispatch();
			this_thread::sleep_for(chrono::milliseconds(5));
		}
	}

public:
	static Scheduler* GetCurrent() {
		if (current == NULL)
			current = new Scheduler();

		return current;
	}
	
	void Stop() {
		active = false;
		current = NULL;
		t->join();
	}

	void Dispatch() {
		for (list<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
			Worker* worker = *it;
				
			if (worker->CanRun()) {
				m.lock();
				workers.remove(worker);
				m.unlock();
				worker->Run();
				cout << "Worker finished!" << endl;
				break;
			}
		}
	}
	
	void Queue(Worker* worker) {
		m.lock();
		workers.push_back(worker);
		m.unlock();
	}
};

Scheduler* Scheduler::current;

class Proxy {
public:
	Future* RunLongWorker() {
		auto w = new LongWorker();
		Scheduler::GetCurrent()->Queue(w);
		return w->GetResult();
	}
	
	Future* RunShortWorker() {
		auto w = new ShortWorker();
		Scheduler::GetCurrent()->Queue(w);
		return w->GetResult();
	}
};

Practical considerations

  • The AO requires the implementation of many classes
  • In particular, for every operation on the servant (and proxy) a worker class must be prepared
  • Every such class must encode all the parameters of the operation and maintain a future
  • Usually this is boring and repetitive code that could be generated
  • This is therefore a good candidate for using templates
  • Additionally we could take advantage of type lists

Sample activate object sequence

participant Client
participant Proxy
participant Scheduler
participant ActivationQueue
participant ConcreteWorker
participant Servant
activate Client
activate Scheduler
Client -> Proxy: runWorker()
activate Proxy
create ConcreteWorker
Proxy -> ConcreteWorker: new
Proxy -> Scheduler: queue(Worker)
Client <-- Proxy: future
deactivate Proxy
Scheduler -> ActivationQueue: enqueue(Worker)
Scheduler -> ConcreteWorker: canRun()
activate ConcreteWorker
Scheduler <-- ConcreteWorker: return true
deactivate ConcreteWorker
Scheduler -> ActivationQueue: dequeue(Worker)
Scheduler -> Scheduler: dispatch(Worker)
activate Scheduler
Scheduler -> ConcreteWorker: run()
deactivate Scheduler
activate ConcreteWorker
ConcreteWorker -> Servant: run()
Client <-- ConcreteWorker: modify future
deactivate ConcreteWorker

The reactor pattern

  • GUI give us a single thread of control - the event loop or reactor
  • The structure of such a reactor is as follows:
    1. Resources in form of Handles
    2. Event Demultiplexer, which uses the event loop to block all resources
    3. Dispatcher, registers / unregisters handlers and dispatches events
    4. Event Handler, with its assigned Handle (resource)
  • The demultiplexer sends the handle to the dispatcher as soon as possible

Reactor diagram

skinparam classAttributeIconSize 0
class Dispatcher {
  +handle()
  +registerHandler(EventHandler)
  +removeHandler(EventHandler)
}

class Handle {
}

class EventDemultiplexer {
  +select()
}

class EventHandler {
  +{abstract}handle(type)
  +getHandle(): Handle
}

Dispatcher "1" -> "*" EventHandler
Dispatcher --> EventDemultiplexer : "use"
EventDemultiplexer --> Handle : "notify"
EventHandler .> Handle
ConcreteEventHandler --|> EventHandler

Remarks

  • A handle might be a system resource (connection, file, ...)
  • All reactor systems are single-threaded
  • However, they can exist in multi-threaded environments
  • Usually communication with threads is possible by using channels
  • JavaScript runs in a reactor, with code being single-threaded
  • The reactor allows code to run concurrently without cross-threading

Sample implementation

class Handle
{
	//Empty for illustration
}

abstract class EventHandler
{
	public abstract Handle Handle { get; }
	
	public abstract void HandleEvent();
}

class ConcreteEventHandler : EventHandler
{
	private Handle myhandle;
	
	public override Handle Handle
	{
		get { return myhandle; }
	}
	
	public override void HandleEvent()
	{
		/* ... */
	}
}

class EventDemultiplexer
{
	object obj = new object();
	Queue<Handle> handles = new Queue<Handle>();
	
	public Handle Select()
	{
		lock (obj)
		{
			if (handles.Count > 0)
				return handles.Dequeue();
		}
		
		Thread.Sleep(100);
		return Select();
	}
	
	public void Notify(Handle myhandle)
	{
		lock (obj)
		{
			if (!handles.Contains(myhandle))
				handles.Enqueue(myhandle);
		}
	}
}

class Dispatcher
{
	List<EventHandler> handlers;
	
	public Dispatcher()
	{
		handlers = new List<EventHandler>();
	}
	
	public void RegisterHandler(EventHandler ev)
	{
		if (!handlers.Contains(ev))
			handlers.Add(ev);
	}
	
	public void RemoveHandler(EventHandler ev)
	{
		if (handlers.Contains(ev))
			handlers.Remove(ev);
	}
	
	public void Handle()
	{
		while (true)
		{
			var handle = source.Select();
			
			foreach (var handler in handlers)
				if (handler.Handle == handle)
					handler.HandleEvent();
		}
	}
}
class Handle {
	//Empty for illustration
}

abstract class EventHandler {
	public abstract Handle getHandle();
	
	public abstract void handleEvent();
}

class ConcreteEventHandler extends EventHandler {
	private Handle myhandle;

	@Override
	public Handle getHandle() {
		return myhandle;
	}
	
	@Override
	public void handleEvent() {
		/* ... */
	}
}

class EventDemultiplexer {
	object obj = new object();
	Queue<Handle> handles = new LinkedList<Handle>();
	
	public Handle select() {
		synchronize (obj) {
			if (handles.size() > 0)
				return handles.remove();
		}
		
		Thread.sleep(100);
		return select();
	}
	
	public void notify(Handle myhandle) {
		synchronize (obj) {
			if (!handles.contains(myhandle))
				handles.add(myhandle);
		}
	}
}

class Dispatcher {
	List<EventHandler> handlers;
	
	public Dispatcher() {
		handlers = new ArrayList<EventHandler>();
	}
	
	public void registerHandler(EventHandler ev) {
		if (!handlers.contains(ev))
			handlers.add(ev);
	}
	
	public void removeHandler(EventHandler ev) {
		if (handlers.contains(ev))
			handlers.remove(ev);
	}
	
	public void handle() {
		while (true) {
			Handle handle = source.select();
			
			for (EventHandler handler : handlers)
				if (handler.getHandle() == handle)
					handler.handleEvent();
		}
	}
}
class Handle {
	//Empty for illustration
};

class EventHandler {
public:
	virtual Handle* GetHandle() = 0;
	
	virtual void HandleEvent() = 0;
};

class ConcreteEventHandler : public EventHandler {	
public:
	Handle* GetHandle() {
		return myhandle;
	}
	
	void HandleEvent() {
		cout << "Start handling pseudo event ... ";
		this_thread::sleep_for(chrono::milliseconds(2000));
		cout << "done!!" << endl;
	}
};

class EventDemultiplexer {
private:
	mutex m;
	set<Handle*> handles;
	
public:
	Handle* Select() {
		m.lock();

		if (handles.begin() != handles.end()) {
			Handle* handle = *(handles.begin());
			handles.erase(handles.begin());
			m.unlock();
			return handle;
		} else {
			chrono::milliseconds wait(100);
			m.unlock();
			this_thread::sleep_for(wait);
			return Select();
		}
	}
	
	void Notify(Handle* myhandle) {
		m.lock();
		handles.insert(myhandle);
		m.unlock();
	}
};

class Dispatcher {
private:
	set<EventHandler*> handlers;

public:
	Dispatcher() {
	}
	
	void RegisterHandler(EventHandler* ev) {
		handlers.insert(ev);
	}
	
	void RemoveHandler(EventHandler* ev) {
		handlers.erase(ev);
	}
	
	void Dispatch() {
		Handle* handle;

		while (true) {
			handle = source.Select();
			
			for (set<EventHandler*>::iterator it = handlers.begin(); it != handlers.end(); ++it)
				if ((*it)->GetHandle() == handle)
					(*it)->HandleEvent();
		}
	}
};

Practical considerations

  • Due to the synchronous calling of event handlers, the reactor gives us simple concurrency
  • This is achieved without adding the complexity of multiple threads to the system
  • However, the pattern itself is tedious to debug due to the inverted flow of control
  • Additionally the single-threaded nature limits the maximum concurrency
  • The scalability of the reactor pattern is also quite limited

Sample reactor sequence

participant Program
participant ConcreteEventHandler
participant Dispatcher
participant EventDemultiplexer

activate Program
create Dispatcher
Program -> Dispatcher: new
create ConcreteEventHandler
Program -> ConcreteEventHandler: new
Program -> Dispatcher: registerHandler(handler)
activate Dispatcher
Dispatcher -> ConcreteEventHandler: getHandle()
activate ConcreteEventHandler
Dispatcher <-- ConcreteEventHandler: return handle
deactivate ConcreteEventHandler
deactivate Dispatcher
Program -> Dispatcher: handle()
deactivate Program
activate Dispatcher
Dispatcher -> EventDemultiplexer: select()
Dispatcher -> ConcreteEventHandler: handle(eventType)
deactivate Dispatcher
activate ConcreteEventHandler

Literature

  • Lea, Doug (1999). Concurrent Programming in Java: Design Principles and Patterns.
  • Schmidt, Douglas (1995). Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events.
  • Schmidt, Douglas; Stal, Michael; Rohnert, Hans; Buschmann, Frank (2000). Pattern-Oriented Software Architecture, Patterns for Concurrent and Networked Objects.