1 /**
2  * Level and edge triggers condition with multiple waiters
3  */
4 module mecca.reactor.sync.event;
5 
6 // Licensed under the Boost license. Full copyright information in the AUTHORS file
7 
8 import mecca.lib.exception;
9 import mecca.lib.time;
10 import mecca.log;
11 import mecca.reactor;
12 import mecca.reactor.sync.fiber_queue;
13 import mecca.reactor.sync.verbose;
14 
15 /**
16   Level trigger condition variable supporting multiple waiters.
17  */
18 struct Event {
19 private:
20     VolatileFiberQueue waiters;
21     bool currentlySet;
22     EventReporter reporter;
23 
24 public:
25     /**
26      * Optional constructor setting the initial state.
27      *
28      * Params:
29      * initialState = true means the Event is initially set. false (default) means it is initially unset.
30      */
31     this(bool initialState) nothrow @safe @nogc {
32         currentlySet = initialState;
33     }
34 
35     /**
36      * Set the event.
37      *
38      * If there are any waiters, release all of them.
39      */
40     void set() nothrow @safe @nogc {
41         DBG_ASSERT!"Event is set but has fibers waiting"(!isSet || waiters.empty);
42         if (isSet)
43             return;
44 
45         currentlySet = true;
46         waiters.resumeAll();
47         report(SyncVerbosityEventType.HazardOff);
48     }
49 
50     /**
51      * Reset the event.
52      *
53      * Any future fiber calling wait will block until another call to set. This is the Event default state.
54      */
55     void reset() nothrow @safe @nogc {
56         DBG_ASSERT!"Event is set but has fibers waiting"(!isSet || waiters.empty);
57         if( currentlySet )
58             report(SyncVerbosityEventType.HazardOn);
59         currentlySet = false;
60     }
61 
62     /// Report the Event's current state.
63     @property bool isSet() const pure nothrow @safe @nogc {
64         return currentlySet;
65     }
66 
67     /**
68      * waits for the event to be set
69      *
70      * If the event is already set, returns without sleeping.
71      *
72      * Params:
73      * timeout = sets a timeout for the wait.
74      *
75      * Throws:
76      * TimeoutExpired if the timeout expires.
77      *
78      * Any other exception injected to this fiber using Reactor.throwInFiber
79      */
80     void wait(Timeout timeout = Timeout.infinite) @safe @nogc {
81         bool reported;
82 
83         while( !isSet ) {
84             if( !reported ) {
85                 report(SyncVerbosityEventType.Contention);
86                 reported = true;
87             }
88             waiters.suspend(timeout);
89         }
90 
91         if( reported ) {
92             report(SyncVerbosityEventType.Wakeup);
93         }
94 
95         // We might have gone through without sleeping
96         theReactor.assertMayContextSwitch();
97     }
98 
99     /**
100      * waits for the event to be set with potential spurious wakeups
101      *
102      * If the event is already set, returns without sleeping.
103      *
104      * The main difference between this method and wait is that this method supports the case where the struct holding the Event is freed
105      * while the fiber is sleeping. As a result, two main differences are possible:
106      * $(OL
107      * $(LI Spurious wakeups are possible $(LPAREN)i.e. - `unreliableWait` returns, but the event is not set$(RPAREN) )
108      * $(LI The VerboseEvent will not report when we wake up from the sleep.) )
109      *
110      * Proper invocation should use the following pattern:
111      * ----
112      * while( isEventValid && !event.unreliableWait ) {}
113      * ----
114      *
115      * It should be pointed out that spurious wakeups only happen when another fiber sets the event. This might still be
116      * a spurious because another fiber might reset the event before the waiting fiber gets a chance to run. If this
117      * is a desired behavior, you might wish to check whether `Signal` fits your needs better.
118      *
119      * Params:
120      * timeout = sets a timeout for the wait.
121      *
122      * Returns:
123      * Returns `true` if the event was already set when the call was made.
124      *
125      * Throws:
126      * `TimeoutExpired` if the timeout expires.
127      *
128      * Any other exception injected to this fiber using `Reactor.throwInFiber`
129      */
130     bool unreliableWait(Timeout timeout = Timeout.infinite) @safe @nogc {
131         if( isSet ) {
132             theReactor.assertMayContextSwitch();
133             return true;
134         }
135 
136         report(SyncVerbosityEventType.Contention);
137         waiters.suspend(timeout);
138         // Will not report wakeup, as we cannot know that the event still exists: report(SyncVerbosityEventType.Wakeup);
139 
140         return false;
141     }
142 
143 package:
144     void setVerbosityCallback(EventReporter reporter) nothrow @safe @nogc {
145         this.reporter = reporter;
146     }
147 
148     void report(SyncVerbosityEventType type) nothrow @safe @nogc {
149         if( reporter !is null )
150             reporter(type);
151     }
152 }
153 
154 /**
155  * A wrapper around Event that adds verbosity to state changes.
156  *
157  * All state changes will be reported (from set to reset and vice versa). Also, a fiber that has to sleep due to the Event not being set will
158  * also be reported.
159  *
160  * Don't forget to call open, or the event will behave as a usual Event.
161  *
162  * Params:
163  *  Name = the display name to show for the Event.
164  *  ExtraParam = An optional extra type to provide more context for the specific event instance. The data that goes with this type is
165  *     provided as an argument to open.
166  */
167 template VerboseEvent(string Name, ExtraParam = void) {
168     alias VerboseEvent = SyncVerbosity!(Event, Name, ExtraParam);
169 }
170 
171 unittest {
172     //import mecca.reactor.fd;
173     import mecca.reactor;
174 
175     theReactor.setup();
176     scope(exit) theReactor.teardown();
177 
178     VerboseEvent!"UT" evt;
179     evt.open();
180 
181     uint counter;
182     uint doneCount;
183     bool done;
184 
185     enum NumWaiters = 30;
186 
187     void worker() {
188         while(!done) {
189             theReactor.yield();
190             evt.wait();
191             counter++;
192         }
193 
194         doneCount++;
195     }
196 
197     void framework() {
198         uint savedCounter;
199 
200         enum Delay = dur!"msecs"(1);
201         foreach(i; 0..10) {
202             INFO!"Reset event"();
203             evt.reset();
204             savedCounter = counter;
205             INFO!"Infra begin sleep"();
206             theReactor.sleep(Delay);
207             INFO!"Infra end sleep"();
208             assert(savedCounter == counter, "Worker fibers working while event is reset");
209 
210             INFO!"Set event"();
211             evt.set();
212             INFO!"Infra begin sleep2"();
213             theReactor.sleep(Delay);
214             INFO!"Infra end sleep2"();
215             assert(savedCounter != counter, "Worker fibers not released despite event set");
216         }
217 
218         INFO!"Reset event end"();
219         evt.reset();
220         theReactor.yield();
221 
222         assert(doneCount==0, "Worker fibers exit while not done");
223         done = true;
224         INFO!"Infra begin sleep end"();
225         theReactor.sleep(Delay);
226         INFO!"Infra end sleep end"();
227 
228         assert(doneCount==0, "Worker fibers exit with event reset");
229         INFO!"Set event end"();
230         evt.set();
231         INFO!"Infra yeild"();
232         theReactor.yield();
233         assert(doneCount==NumWaiters, "Not all worker fibers woke up from event");
234 
235         INFO!"Infra done"();
236         theReactor.stop();
237     }
238 
239     foreach(i; 0..NumWaiters)
240         theReactor.spawnFiber(&worker);
241 
242     theReactor.spawnFiber(&framework);
243 
244 
245     theReactor.start();
246 }
247 
248 /// Edge trigger condition variable supporting multiple waiters.
249 struct Signal {
250 private:
251     FiberQueue waiters;
252 
253 public:
254     /**
255      * waits for the event to trigger
256      *
257      * This function is $(B guaranteed) to sleep.
258      *
259      * Params:
260      * timeout = sets a timeout for the wait.
261      *
262      * Throws:
263      * TimeoutExpired if the timeout expires.
264      *
265      * Any other exception injected to this fiber using Reactor.throwInFiber
266      */
267     void wait(Timeout timeout = Timeout.infinite) @safe @nogc {
268         waiters.suspend(timeout);
269     }
270 
271     /**
272      * Wake up all waiting fibers
273      */
274     void signal() nothrow @safe @nogc {
275         waiters.resumeAll();
276     }
277 }