Software Design Patterns
Florian Rappl, Fakultät für Physik, Universität Regensburg
Software Design Patterns
Introduction to modern software architecture
Concurrency patterns
Introduction
- They deal with the multi-threaded programming paradigm
- However, in general they describe asynchronous processes
- This does not necessary mean multi-threaded
- Usually one does not need threads for async behavior
- Goal: Reduce communication overhead, while increasing flexibility
- Manage program resources more efficiently
Threading overview
Problems and primitives
- Multi-threading might end in race conditions
- A race-condition is when two or more threads want to access a specific resource at the same time
- Also memory inconsistencies like instruction re-ordering or only partially constructed objects might arise
- To cover this we can use Mutex objects
- These are mutually exclusive and the basis for e.g. barriers, ...
The scheduler pattern
- Old problem: How to synchronize access?
- Special case: Limit access to a specific resource to a single thread
- The result is that the resource will always be accessed synchronized
- Use-case: Synchronized access to the elements in a GUI
- The scheduler pattern provides the basis for sequencing waiting threads
- It is similar to the mediator pattern, with the various threads being mediated
Scheduler diagram
Remarks
- Usually a mechanism to implement a scheduling policy is provided
- This mechanism is defined in an abstract class
- The pattern is independent of a specific scheduling policy
- This policy might be FIFO, FILO, priority queue, ... or customized
- This pattern adds some overhead beyond a barrier or similar locking techniques
Sample implementation
interface IWorker
{
void Run();
}
interface IScheduler
{
void Dispatch();
void Enqueue(IWorker worker);
}
class ThreadScheduler
{
IScheduler scheduler;
Thread thread;
public ThreadScheduler(IScheduler scheduler)
{
this.scheduler = scheduler;
this.thread = new Thread(Loop);
}
public void Run()
{
thread.Start();
}
void Loop()
{
while (true)
{
scheduler.Dispatch();
Thread.Sleep(10);
}
}
}
class QueueScheduler : IScheduler
{
Queue<IWorker> queue;
Object key;
public QueueScheduler()
{
queue = new Queue<IWorker>();
key = new Object();
}
public void Dispatch()
{
IWorker process;
lock (key)
{
if (queue.Count == 0)
return;
process = queue.Dequeue();
}
process.Run();
}
public void Enqueue(IWorker worker)
{
lock (key)
{
queue.Enqueue(worker);
}
}
}
class SleepyWorker : IWorker
{
public void Run()
{
Console.WriteLine("Sleeping 1s...");
Thread.Sleep(1000);
Console.WriteLine("Finished sleeping!");
}
}
class WorkingWorker : IWorker
{
public void Run()
{
Console.WriteLine("Doing hard work for 2s...");
Thread.Sleep(2000);
Console.WriteLine("Finished doing work!");
}
}
interface Worker {
void run();
}
interface Scheduler {
void dispatch();
void enqueue(Worker worker);
}
class ThreadScheduler {
Scheduler scheduler;
Thread thread;
public ThreadScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
this.thread = new Thread(new Loop());
}
public void run() {
thread.start();
}
class Loop implements Runnable {
public void run() {
while (true) {
scheduler.dispatch();
Thread.sleep(10);
}
}
}
}
class QueueScheduler implements Scheduler {
Queue<Worker> queue;
Object key;
public QueueScheduler() {
queue = new LinkedList<Worker>();
key = new Object();
}
public void dispatch() {
Worker process = null;
synchronized (key) {
process = queue.poll();
}
if (process == null)
return;
process.run();
}
public void enqueue(Worker worker) {
synchronized (key) {
queue.add(worker);
}
}
}
class SleepyWorker implements IWorker {
public void run() {
out.println("Sleeping 1s...");
Thread.sleep(1000);
out.println("Finished sleeping!");
}
}
class WorkingWorker implements IWorker {
public void run() {
out.println("Doing hard work for 2s...");
Thread.sleep(2000);
out.println("Finished doing work!");
}
}
class Worker {
public:
virtual void Run() = 0;
};
class Scheduler {
public:
virtual void Dispatch() = 0;
virtual void Enqueue(Worker *worker) = 0;
};
class ThreadScheduler {
private:
thread *t;
static void Loop(Scheduler *scheduler) {
chrono::milliseconds ten(10);
while (true) {
scheduler->Dispatch();
this_thread::sleep_for(ten);
}
}
public:
ThreadScheduler(Scheduler &scheduler) {
t = new thread(Loop, &scheduler);
}
void Run() {
t->detach();
}
};
class QueueScheduler : public Scheduler {
private:
queue<Worker*> *workers;
mutex key;
public:
QueueScheduler() {
workers = new queue<Worker*>();
}
void Dispatch() {
Worker *process = NULL;
key.lock();
if (workers->empty())
return;
process = workers->front();
workers->pop();
key.unlock();
process->Run();
}
void Enqueue(Worker *worker) {
key.lock();
workers->push(worker);
key.unlock();
}
};
class SleepyWorker : public Worker {
public:
void Run() {
chrono::seconds one(1);
cout << "Sleeping 1s..." << endl;
this_thread::sleep_for(one);
cout << "Finished sleeping!" << endl;
}
};
class WorkingWorker : public Worker {
public:
void Run() {
chrono::seconds two(2);
cout << "Doing hard work for 2s..." << endl;
this_thread::sleep_for(two);
cout << "Finished doing work!" << endl;
}
};
Practical considerations
- The read/write lock pattern can be implemented using the scheduler pattern
- The .NET TPL uses this pattern to provide a scheduler for tasks
- Also Intel TBB offers a task scheduler, which can also be used to accommodate a given worker size
- In general schedulers might be used for various scenarios, not only if a certain task needs to be executed single-threaded
The monitor pattern
- Synchronization is important and sometimes a certain condition has to be fulfilled for allowing thread(s) to continue
- The monitor object pattern allows both:
- mutual exclusion
- ability to wait for a certain condition
- Therefore this pattern is thread-safe and condition-driven
- Additionally we might want to inform a thread about condition-changes
Monitor diagram
Remarks
- A condition variable consists of threads that are waiting on a certain condition to be fulfilled
- Monitors provide a mechanism for threads to temporarily give up exclusive access
- This is done in order to wait for some condition to be met
- After the condition is met the exclusive access is regained
- Their task is then continued (threads are resumed)
Sample implementation
class MonitorLock
{
}
class MonitorCondition
{
MonitorLock mlock;
Queue<Thread> threads;
public MonitorCondition(MonitorLock _lock)
{
threads = new Queue<Thread>();
mlock = _lock;
}
public void Wait()
{
var willSleep = false;
lock (mlock)
{
willSleep = threads.Count > 0;
threads.Enqueue(Thread.CurrentThread);
}
if (willSleep)
{
try { Thread.Sleep(Timeout.Infinite); }
catch (ThreadInterruptedException) { }
}
}
public void Notify()
{
var willInterrupt = false;
lock (mlock)
{
willInterrupt = threads.Count > 0;
threads.Dequeue();
}
if (willInterrupt)
threads.Peek().Interrupt();
}
}
class Monitor
{
MonitorLock mlock;
MonitorCondition mcondition;
public Monitor()
{
mlock = new MonitorLock();
mcondition = new MonitorCondition(mlock);
}
public void Tick()
{
mcondition.Wait();
Thread.Sleep(1000);
Console.WriteLine("Tick");
mcondition.Notify();
}
public void Tock()
{
mcondition.Wait();
Thread.Sleep(1000);
Console.WriteLine("Tock");
mcondition.Notify();
}
}
class MonitorLock {
}
class MonitorCondition {
MonitorLock mlock;
Queue<Thread> threads;
public MonitorCondition(MonitorLock _lock) {
threads = new LinkedList<Thread>();
mlock = _lock;
}
public void wait() {
synchronize (_lock) {
threads.add(Thread.currentThread());
if (threads.size() > 0)
_lock.wait();
}
}
public void notify() {
bool willInterrupt = false;
synchronize (_lock) {
threads.remove();
willInterrupt = threads.size() > 0;
}
if (willInterrupt)
_lock.notify();
}
}
class Monitor {
MonitorLock mlock;
MonitorCondition mcondition;
public Monitor() {
mlock = new MonitorLock();
mcondition = new MonitorCondition(mlock);
}
public void tick() {
mcondition.wait();
Thread.sleep(1000);
out.println("Tick");
mcondition.notify();
}
public void tock() {
mcondition.wait();
Thread.sleep(1000);
out.println("Tock");
mcondition.notify();
}
}
class MonitorLock {
private:
mutex m;
public:
void Acquire() {
m.lock();
}
void Release() {
m.unlock();
}
};
class MonitorCondition {
private:
MonitorLock* mlock;
queue<volatile bool*> threads;
public:
MonitorCondition(MonitorLock* _lock) : mlock(_lock) {
}
void Wait() {
mlock->Acquire();
bool willSleep = threads.size() > 0;
volatile bool* sleepRef = &willSleep;
threads.push(sleepRef);
mlock->Release();
while (*sleepRef) {
chrono::milliseconds wait(100);
this_thread::sleep_for(wait);
}
}
void Notify() {
mlock->Acquire();
threads.pop();
bool willInterrupt = threads.size() > 0;
mlock->Release();
if (willInterrupt)
*(threads.front()) = false;
}
};
class Monitor {
private:
MonitorLock* mlock;
MonitorCondition* mcondition;
public:
Monitor() {
mlock = new MonitorLock();
mcondition = new MonitorCondition(mlock);
}
void Tick() {
chrono::seconds second(1);
mcondition->Wait();
this_thread::sleep_for(second);
cout << "Tick" << endl;
mcondition->Notify();
}
void Tock() {
chrono::seconds second(1);
mcondition->Wait();
this_thread::sleep_for(second);
cout << "Tock" << endl;
mcondition->Notify();
}
};
Practical considerations
- Each condition represents a waiting room
- The Notify method is used to wake up a waiting process
- In real-life this works like a fast food restaurant
- Separate concerns and protect object state from uncontrolled changes
- Objects should be responsible for ensuring that any of their methods that require synchronization are serialized transparently
- Conditions act as a scheduler (in fact a scheduler might be considered)
- Be aware of the nested monitor lockout (no nesting!)
Sample monitor sequence
The thread pool pattern
- Usually we don't want to create an arbitrary number of threads
- Instead we want a maximum number of threads to handle open tasks
- The solution is to use a thread pool, which recycles and limits threads
- Idea: Obtain threads faster, reduce used resources, optimal usage
- A thread pool consists of a task queue and a pool of running workers
- If a thread pool is available we should definitely prefer it to plain threads
Thread pool diagram
Remarks
- The thread pool pattern makes most sense with the task concept
- A task is a wrapper around a method call that could run concurrently
- In general we have more tasks than threads, i.e. tasks are executed by threads
- As soon as a thread completes its task, it will request the next task from the queue until all tasks have been completed
- The thread can then terminate, or sleep until there are new tasks available
Sample implementation
public interface IWorkUnit
{
void Process();
}
public interface IWorker
{
void Prepare();
void Run();
void Close();
}
public class WorkerThread : IWorker
{
Queue<IWorkUnit> queue;
Thread thread;
public WorkerThread(Queue<IWorkUnit> queue)
{
this.queue = queue;
}
public void Prepare()
{
thread = new Thread(Loop);
}
public void Run()
{
thread.Start();
}
public void Close()
{
thread.Abort();
}
void Loop()
{
while (true)
{
IWorkUnit item = null;
lock(queue)
{
if (queue.Count > 0)
item = queue.Dequeue();
}
if (item != null)
item.Process();
else
Thread.Sleep(100);
}
}
}
public class ThreadPool
{
private readonly int nThreads;
private readonly IWorker[] threads;
private readonly Queue<IWorkUnit> queue;
public ThreadPool(int nThreads)
{
this.nThreads = nThreads;
this.queue = new Queue<IWorkUnit>();
this.threads = new IWorker[nThreads];
}
public void Initialize()
{
for (int i = 0; i < nThreads; i++)
{
threads[i] = new WorkerThread(queue);
threads[i].Prepare();
threads[i].Run();
}
}
public void Shutdown()
{
for (int i = 0; i < nThreads; i++)
{
threads[i].Close();
}
}
public void Queue(IWorkUnit item)
{
lock(queue)
{
queue.Enqueue(item);
}
}
}
public interface WorkUnit {
void process();
}
public class WorkerThread extends Thread {
LinkedList<WorkUnit> queue;
public WorkerThread(LinkedList<WorkUnit> queue) {
this.queue = queue;
}
public void run() {
WorkUnit item;
while (true) {
synchronized(queue) {
while (queue.isEmpty()) {
try { queue.wait(); }
catch (InterruptedException ignored) { }
}
item = queue.removeFirst();
}
try {
item.process();
} catch (RuntimeException e) {
/* ... */
}
}
}
}
public class ThreadPool {
private final int nThreads;
private final Thread[] threads;
private final LinkedList<WorkUnit> queue;
public ThreadPool(int nThreads) {
this.nThreads = nThreads;
this.queue = new LinkedList<WorkUnit>();
this.threads = new Thread[nThreads];
}
public void initialize() {
for (int i = 0; i < nThreads; i++) {
threads[i] = new WorkerThread(queue);
threads[i].start();
}
}
public void shutdown() {
for (int i = 0; i < nThreads; i++) {
threads[i].interrupt();
threads[i].shutdown();
}
}
public void queue(WorkUnit item) {
synchronized(queue) {
queue.addLast(item);
queue.notify();
}
}
}
class WorkUnit {
public:
virtual void Process() = 0;
};
class Worker {
public:
virtual void Prepare() = 0;
virtual void Run() = 0;
};
class WorkerThread : public Worker {
private:
queue<WorkUnit*>* workers;
thread* worker;
mutex* key;
void Loop() {
chrono::milliseconds wait(100);
while (true) {
WorkUnit* item = NULL;
key->lock();
if (workers->empty() == false) {
item = workers->front();
workers->pop();
}
key->unlock();
if (item != NULL)
item->Process();
else
this_thread::sleep_for(wait);
}
}
public:
WorkerThread(queue<WorkUnit*> &workers, mutex &key) {
this->workers = &workers;
this->key = &key;
}
void Prepare() {
worker = new thread(&WorkerThread::Loop, this);
}
void Run() {
worker->detach();
}
};
class ThreadPool {
private:
const int nThreads;
Worker** threads;
queue<WorkUnit*> workers;
mutex key;
public:
ThreadPool(int nThreads) : nThreads(nThreads) {
this->threads = new Worker*[nThreads];
}
void Initialize() {
for (int i = 0; i < nThreads; i++) {
threads[i] = new WorkerThread(workers, key);
threads[i]->Prepare();
threads[i]->Run();
}
}
void Queue(WorkUnit* item) {
key.lock();
workers.push(item);
key.unlock();
}
};
Practical considerations
- In .NET we could use the ThreadPool class
- However, for some problems our own thread pool might be useful
- Examples: Long running tasks, sleeping tasks with frequent polling
- The WorkUnit interface symbolizes a proper work item
- This allows our own thread pool to return results after completion
- Also we might implement dependencies from one WorkUnit to another
- This concept is implemented in .NET with the TPL
Advantages of multi-threading
The active object pattern
- The AO pattern decouples method execution from method invocation for an object
- The invocation should occur in the client's thread
- The execution in the AO thread of control
- The design should make this look transparent (using the proxy pattern)
Active object components
- Proxy Provides an interface to the client
- Worker base class with one class for each method of the proxy
- Activation queue that contains the requested invocations
- Scheduler that picks the request to be processed next
- Servant that processes the requests
- Future that contains the response
Active object diagram
Remarks
- The proxy transforms each request into an instance of a method class
- Possible parameters are stored / used for object creation
- It also creates the appropriate future for storing the result
- The future is initially empty and also has a status field
- The method object has a reference to the future
- The activation queue can be implemented using the Monitor pattern
- The servant has the same interface as the proxy and implements the actual methods
Sample implementation
class Future
{
public bool IsFinished
{
get;
private set;
}
public object Result
{
get;
private set;
}
public void SetResult(object value)
{
if (IsFinished)
return;
IsFinished = true;
Result = value;
}
}
interface IWorker
{
bool CanRun();
void Run();
Future Result { get; }
}
class Servant
{
public double DoWork()
{
var sw = Stopwatch.StartNew();
Console.WriteLine("I am now running ...");
Thread.Sleep(1000);
sw.Stop();
return sw.ElapsedMilliseconds;
}
}
class LongWorker : IWorker
{
Servant servant;
Future result;
DateTime ahead;
public LongWorker()
{
this.servant = new Servant();
this.result = new Future();
this.ahead = DateTime.Now.AddSeconds(10);
}
public Future Result
{
get { return result; }
}
public bool CanRun()
{
return DateTime.Now.CompareTo(ahead) >= 0;
}
public void Run()
{
var value = servant.DoWork();
result.SetResult(value);
}
}
class ShortWorker : IWorker
{
Future result;
public ShortWorker()
{
this.result = new Future();
}
public Future Result
{
get { return result; }
}
public bool CanRun()
{
return true;
}
public void Run()
{
result.SetResult("Short worker finished first!");
}
}
class Scheduler
{
List<IWorker> workers;
static Scheduler current;
Thread thread;
object mlock;
private Scheduler()
{
mlock = new object();
workers = new List<IWorker>();
thread = new Thread(Loop);
thread.Start();
}
void Loop()
{
while (true)
{
Dispatch();
Thread.Sleep(5);
}
}
public static Scheduler Current
{
get { return current ?? (current = new Scheduler()); }
}
public void Stop()
{
thread.Interrupt();
current = null;
}
public void Dispatch()
{
for (int i = 0; i < workers.Count; i++)
{
var worker = workers[i];
if (worker.CanRun())
{
lock (mlock)
{
workers.RemoveAt(i);
}
worker.Run();
Console.WriteLine("Worker finished!");
break;
}
}
}
public void Queue(IWorker worker)
{
lock (mlock)
{
workers.Add(worker);
}
}
}
class Proxy
{
public Future RunLongWorker()
{
var w = new LongWorker();
Scheduler.Current.Queue(w);
return w.Result;
}
public Future RunShortWorker()
{
var w = new ShortWorker();
Scheduler.Current.Queue(w);
return w.Result;
}
}
class Future {
private bool finished;
private object result;
public bool getIsFinished() {
return finished;
}
public object getResult() {
return result;
}
public void SetResult(object value) {
if (finished)
return;
finished = true;
result = value;
}
}
interface Worker {
bool canRun();
void run();
Future getResult();
}
class Servant {
public double doWork() {
var sw = new StopWatch();
sw.start();
out.println("I am now running ...");
Thread.sleep(1000);
sw.stop();
return sw.getTotalTimeMillis();
}
}
class LongWorker implements Worker {
Servant servant;
Future result;
int tries;
public LongWorker() {
this.servant = new Servant();
this.result = new Future();
this.tries = 0;
}
public Future getResult() {
return result;
}
public bool canRun() {
return tries++ == 10;
}
public void run() {
double value = servant.DoWork();
result.setResult(value);
}
}
class ShortWorker implements Worker {
Future result;
public ShortWorker() {
this.result = new Future();
}
public Future getResult() {
return result;
}
public bool canRun() {
return true;
}
public void run() {
result.setResult("Short worker finished first!");
}
}
class Scheduler {
List<Worker> workers;
static Scheduler current;
object mlock;
Thread thread;
private Scheduler() {
mlock = new object();
workers = new ArrayList<Worker>();
thread = new Thread(new Loop());
thread.start();
}
public class Loop : Runnable {
public void Run() {
while (true) {
Scheduler.getCurrent().dispatch();
Thread.sleep(5);
}
}
}
public static Scheduler getCurrent() {
if (current == null)
current = new Scheduler();
return current;
}
public void stop() {
thread.interrupt();
current = null;
}
public void dispatch() {
for (int i = 0; i < workers.size(); i++) {
Worker worker = workers[i];
if (worker.canRun()) {
synchronize (mlock) {
workers.remove(worker);
}
worker.run();
out.println("Worker finished!");
break;
}
}
}
public void queue(Worker worker) {
synchronize (mlock) {
workers.add(worker);
}
}
}
class Proxy {
public Future RunLongWorker() {
LongWorker w = new LongWorker();
Scheduler.getCurrent().queue(w);
return w.getResult();
}
public Future RunShortWorker() {
ShortWorker w = new ShortWorker();
Scheduler.getCurrent().queue(w);
return w.getResult();
}
}
class Future {
private:
bool finished;
string result;
public:
bool GetIsFinished() {
return finished;
}
string GetResult() {
return result;
}
void SetResult(string value) {
if (finished)
return;
finished = true;
result = value;
}
};
class Worker {
public:
virtual bool CanRun() = 0;
virtual void Run() = 0;
virtual Future* GetResult() = 0;
};
class Servant {
public:
double DoWork() {
cout << "I am now running ..." << endl;
this_thread::sleep_for(chrono::seconds(1));
return 1000.0;
}
};
class LongWorker : public Worker {
private:
Servant servant;
Future* result;
int trials;
public:
LongWorker() {
result = new Future();
trials = 0;
}
Future* GetResult() {
return result;
}
bool CanRun() {
return trials++ == 10;
}
void Run() {
double value = servant.DoWork();
ostringstream strs;
strs << value;
result->SetResult(strs.str());
}
};
class ShortWorker : public Worker {
private:
Future* result;
public:
ShortWorker() {
result = new Future();
}
Future* GetResult() {
return result;
}
bool CanRun() {
return true;
}
void Run() {
result->SetResult("Short worker finished first!");
}
};
class Scheduler {
private:
static Scheduler* current;
list<Worker*> workers;
thread* t;
mutex m;
volatile bool active;
Scheduler() {
active = true;
t = new thread(&Scheduler::Loop, this);
}
Scheduler(Scheduler const&);
void operator=(Scheduler const&);
void Loop() {
while (active) {
Dispatch();
this_thread::sleep_for(chrono::milliseconds(5));
}
}
public:
static Scheduler* GetCurrent() {
if (current == NULL)
current = new Scheduler();
return current;
}
void Stop() {
active = false;
current = NULL;
t->join();
}
void Dispatch() {
for (list<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
Worker* worker = *it;
if (worker->CanRun()) {
m.lock();
workers.remove(worker);
m.unlock();
worker->Run();
cout << "Worker finished!" << endl;
break;
}
}
}
void Queue(Worker* worker) {
m.lock();
workers.push_back(worker);
m.unlock();
}
};
Scheduler* Scheduler::current;
class Proxy {
public:
Future* RunLongWorker() {
auto w = new LongWorker();
Scheduler::GetCurrent()->Queue(w);
return w->GetResult();
}
Future* RunShortWorker() {
auto w = new ShortWorker();
Scheduler::GetCurrent()->Queue(w);
return w->GetResult();
}
};
Practical considerations
- The AO requires the implementation of many classes
- In particular, for every operation on the servant (and proxy) a worker class must be prepared
- Every such class must encode all the parameters of the operation and maintain a future
- Usually this is boring and repetitive code that could be generated
- This is therefore a good candidate for using templates
- Additionally we could take advantage of type lists
Sample activate object sequence
The reactor pattern
- GUI give us a single thread of control - the event loop or reactor
- The structure of such a reactor is as follows:
- Resources in form of Handles
- Event Demultiplexer, which uses the event loop to block all resources
- Dispatcher, registers / unregisters handlers and dispatches events
- Event Handler, with its assigned Handle (resource)
- The demultiplexer sends the handle to the dispatcher as soon as possible
Reactor diagram
Remarks
- A handle might be a system resource (connection, file, ...)
- All reactor systems are single-threaded
- However, they can exist in multi-threaded environments
- Usually communication with threads is possible by using channels
- JavaScript runs in a reactor, with code being single-threaded
- The reactor allows code to run concurrently without cross-threading
Sample implementation
class Handle
{
//Empty for illustration
}
abstract class EventHandler
{
public abstract Handle Handle { get; }
public abstract void HandleEvent();
}
class ConcreteEventHandler : EventHandler
{
private Handle myhandle;
public override Handle Handle
{
get { return myhandle; }
}
public override void HandleEvent()
{
/* ... */
}
}
class EventDemultiplexer
{
object obj = new object();
Queue<Handle> handles = new Queue<Handle>();
public Handle Select()
{
lock (obj)
{
if (handles.Count > 0)
return handles.Dequeue();
}
Thread.Sleep(100);
return Select();
}
public void Notify(Handle myhandle)
{
lock (obj)
{
if (!handles.Contains(myhandle))
handles.Enqueue(myhandle);
}
}
}
class Dispatcher
{
List<EventHandler> handlers;
public Dispatcher()
{
handlers = new List<EventHandler>();
}
public void RegisterHandler(EventHandler ev)
{
if (!handlers.Contains(ev))
handlers.Add(ev);
}
public void RemoveHandler(EventHandler ev)
{
if (handlers.Contains(ev))
handlers.Remove(ev);
}
public void Handle()
{
while (true)
{
var handle = source.Select();
foreach (var handler in handlers)
if (handler.Handle == handle)
handler.HandleEvent();
}
}
}
class Handle {
//Empty for illustration
}
abstract class EventHandler {
public abstract Handle getHandle();
public abstract void handleEvent();
}
class ConcreteEventHandler extends EventHandler {
private Handle myhandle;
@Override
public Handle getHandle() {
return myhandle;
}
@Override
public void handleEvent() {
/* ... */
}
}
class EventDemultiplexer {
object obj = new object();
Queue<Handle> handles = new LinkedList<Handle>();
public Handle select() {
synchronize (obj) {
if (handles.size() > 0)
return handles.remove();
}
Thread.sleep(100);
return select();
}
public void notify(Handle myhandle) {
synchronize (obj) {
if (!handles.contains(myhandle))
handles.add(myhandle);
}
}
}
class Dispatcher {
List<EventHandler> handlers;
public Dispatcher() {
handlers = new ArrayList<EventHandler>();
}
public void registerHandler(EventHandler ev) {
if (!handlers.contains(ev))
handlers.add(ev);
}
public void removeHandler(EventHandler ev) {
if (handlers.contains(ev))
handlers.remove(ev);
}
public void handle() {
while (true) {
Handle handle = source.select();
for (EventHandler handler : handlers)
if (handler.getHandle() == handle)
handler.handleEvent();
}
}
}
class Handle {
//Empty for illustration
};
class EventHandler {
public:
virtual Handle* GetHandle() = 0;
virtual void HandleEvent() = 0;
};
class ConcreteEventHandler : public EventHandler {
public:
Handle* GetHandle() {
return myhandle;
}
void HandleEvent() {
cout << "Start handling pseudo event ... ";
this_thread::sleep_for(chrono::milliseconds(2000));
cout << "done!!" << endl;
}
};
class EventDemultiplexer {
private:
mutex m;
set<Handle*> handles;
public:
Handle* Select() {
m.lock();
if (handles.begin() != handles.end()) {
Handle* handle = *(handles.begin());
handles.erase(handles.begin());
m.unlock();
return handle;
} else {
chrono::milliseconds wait(100);
m.unlock();
this_thread::sleep_for(wait);
return Select();
}
}
void Notify(Handle* myhandle) {
m.lock();
handles.insert(myhandle);
m.unlock();
}
};
class Dispatcher {
private:
set<EventHandler*> handlers;
public:
Dispatcher() {
}
void RegisterHandler(EventHandler* ev) {
handlers.insert(ev);
}
void RemoveHandler(EventHandler* ev) {
handlers.erase(ev);
}
void Dispatch() {
Handle* handle;
while (true) {
handle = source.Select();
for (set<EventHandler*>::iterator it = handlers.begin(); it != handlers.end(); ++it)
if ((*it)->GetHandle() == handle)
(*it)->HandleEvent();
}
}
};
Practical considerations
- Due to the synchronous calling of event handlers, the reactor gives us simple concurrency
- This is achieved without adding the complexity of multiple threads to the system
- However, the pattern itself is tedious to debug due to the inverted flow of control
- Additionally the single-threaded nature limits the maximum concurrency
- The scalability of the reactor pattern is also quite limited
Sample reactor sequence
Literature
- Lea, Doug (1999). Concurrent Programming in Java: Design Principles and Patterns.
- Schmidt, Douglas (1995). Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events.
- Schmidt, Douglas; Stal, Michael; Rohnert, Hans; Buschmann, Frank (2000). Pattern-Oriented Software Architecture, Patterns for Concurrent and Networked Objects.