1 /// Cross fibers synchronization point 2 module mecca.reactor.sync.barrier; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import mecca.lib.time; 7 import mecca.log; 8 import mecca.reactor.sync.event; 9 10 /** 11 * A cross fibers synchronization point. 12 * 13 * Barrier has several deployment methods. The basic idea is to divide the fibers into those who need to "check in", and 14 * those that wait for the check in counter to reach the correct amount. 15 * 16 * The most common use case is waiting for launched fibers to finish. To facilitate this mode, the following code 17 * structure is used: 18 * --- 19 * void fiberDlg() { 20 * scope(exit) barrier.markDone(); 21 * ... 22 * } 23 * ... 24 * theReactor.spawnFiber!fiberDlg(); 25 * barrier.addWaiter(); 26 * ... 27 * barrier.waitAll(); 28 * --- 29 */ 30 struct Barrier { 31 private: 32 Event evt = Event(true); 33 uint _numWaiters = 0; 34 35 public: 36 /** 37 * Increase number of expected completions by one. 38 */ 39 void addWaiter() nothrow @safe @nogc { 40 evt.reset(); 41 _numWaiters++; 42 } 43 44 /** 45 * Increase number of completions by one. 46 * 47 * Call this when the completion event the barrier synchronizes on happens. This function does not sleep. 48 */ 49 void markDone() nothrow @safe @nogc { 50 assert (_numWaiters > 0, "numWaiters=0"); 51 _numWaiters--; 52 if (_numWaiters == 0) { 53 evt.set(); 54 } 55 } 56 57 /** 58 * Report whether there are any fibers we might be waiting for 59 */ 60 @property bool hasWaiters() pure const nothrow @safe @nogc { 61 return _numWaiters > 0; 62 } 63 64 /** 65 * Report how many fibers we are waiting for 66 */ 67 @property auto numWaiters() pure const nothrow @safe @nogc { 68 return _numWaiters; 69 } 70 71 /** 72 * Wait for all completion events to happen. 73 * 74 * Halts the fiber until all expected completion events actually happen. 75 * 76 * Throws: 77 * Will throw TimeoutExpired if the timeout is exceeded. 78 * 79 * May also throw any other exception injected into the fiber. 80 */ 81 void waitAll(Timeout timeout = Timeout.infinite) @safe @nogc { 82 evt.wait(timeout); 83 } 84 85 /** 86 * Mark one completion and wait for all other completions to happen. 87 * 88 * This function is, literally, equivalent to calling markDone followed by waitAll. 89 */ 90 void markDoneAndWaitAll(Timeout timeout = Timeout.infinite) @safe @nogc { 91 markDone(); 92 waitAll(timeout); 93 } 94 } 95 96 unittest { 97 import mecca.reactor; 98 import mecca.lib.exception; 99 100 testWithReactor({ 101 Barrier barrier; 102 int count = 0; 103 enum numFibs = 80; 104 105 foreach(_; 0 .. numFibs) { 106 barrier.addWaiter(); 107 theReactor.spawnFiber({ 108 count++; 109 barrier.markDoneAndWaitAll(); 110 theReactor.yield(); 111 count--; 112 }); 113 } 114 115 barrier.waitAll(); 116 assertEQ (count, numFibs); 117 theReactor.yield(); 118 theReactor.yield(); 119 assertEQ (count, 0); 120 }); 121 }