1 /// Cross fibers synchronization point
2 module mecca.reactor.sync.barrier;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import mecca.lib.time;
7 import mecca.log;
8 import mecca.reactor.sync.event;
9 
10 /**
11  * A cross fibers synchronization point.
12  *
13  * Barrier has several deployment methods. The basic idea is to divide the fibers into those who need to "check in", and
14  * those that wait for the check in counter to reach the correct amount.
15  *
16  * The most common use case is waiting for launched fibers to finish. To facilitate this mode, the following code
17  * structure is used:
18  * ---
19  * void fiberDlg() {
20  *   scope(exit) barrier.markDone();
21  *   ...
22  * }
23  * ...
24  * theReactor.spawnFiber!fiberDlg();
25  * barrier.addWaiter();
26  * ...
27  * barrier.waitAll();
28  * ---
29  */
30 struct Barrier {
31 private:
32     Event evt = Event(true);
33     uint _numWaiters = 0;
34 
35 public:
36     /**
37      * Increase number of expected completions by one.
38      */
39     void addWaiter() nothrow @safe @nogc {
40         evt.reset();
41         _numWaiters++;
42     }
43 
44     /**
45      * Increase number of completions by one.
46      *
47      * Call this when the completion event the barrier synchronizes on happens. This function does not sleep.
48      */
49     void markDone() nothrow @safe @nogc {
50         assert (_numWaiters > 0, "numWaiters=0");
51         _numWaiters--;
52         if (_numWaiters == 0) {
53             evt.set();
54         }
55     }
56 
57     /**
58      * Report whether there are any fibers we might be waiting for
59      */
60     @property bool hasWaiters() pure const nothrow @safe @nogc {
61         return _numWaiters > 0;
62     }
63 
64     /**
65      * Report how many fibers we are waiting for
66      */
67     @property auto numWaiters() pure const nothrow @safe @nogc {
68         return _numWaiters;
69     }
70 
71     /**
72      * Wait for all completion events to happen.
73      *
74      * Halts the fiber until all expected completion events actually happen.
75      *
76      * Throws:
77      * Will throw TimeoutExpired if the timeout is exceeded.
78      *
79      * May also throw any other exception injected into the fiber.
80      */
81     void waitAll(Timeout timeout = Timeout.infinite) @safe @nogc {
82         evt.wait(timeout);
83     }
84 
85     /**
86      * Mark one completion and wait for all other completions to happen.
87      *
88      * This function is, literally, equivalent to calling markDone followed by waitAll.
89      */
90     void markDoneAndWaitAll(Timeout timeout = Timeout.infinite) @safe @nogc {
91         markDone();
92         waitAll(timeout);
93     }
94 }
95 
96 unittest {
97     import mecca.reactor;
98     import mecca.lib.exception;
99 
100     testWithReactor({
101         Barrier barrier;
102         int count = 0;
103         enum numFibs = 80;
104 
105         foreach(_; 0 .. numFibs) {
106             barrier.addWaiter();
107             theReactor.spawnFiber({
108                 count++;
109                 barrier.markDoneAndWaitAll();
110                 theReactor.yield();
111                 count--;
112             });
113         }
114 
115         barrier.waitAll();
116         assertEQ (count, numFibs);
117         theReactor.yield();
118         theReactor.yield();
119         assertEQ (count, 0);
120     });
121 }