1 /** 2 * Level and edge triggers condition with multiple waiters 3 */ 4 module mecca.reactor.sync.event; 5 6 // Licensed under the Boost license. Full copyright information in the AUTHORS file 7 8 import mecca.lib.exception; 9 import mecca.lib.time; 10 import mecca.log; 11 import mecca.reactor; 12 import mecca.reactor.sync.fiber_queue; 13 import mecca.reactor.sync.verbose; 14 15 /** 16 Level trigger condition variable supporting multiple waiters. 17 */ 18 struct Event { 19 private: 20 VolatileFiberQueue waiters; 21 bool currentlySet; 22 EventReporter reporter; 23 24 public: 25 /** 26 * Optional constructor setting the initial state. 27 * 28 * Params: 29 * initialState = true means the Event is initially set. false (default) means it is initially unset. 30 */ 31 this(bool initialState) nothrow @safe @nogc { 32 currentlySet = initialState; 33 } 34 35 /** 36 * Set the event. 37 * 38 * If there are any waiters, release all of them. 39 */ 40 void set() nothrow @safe @nogc { 41 DBG_ASSERT!"Event is set but has fibers waiting"(!isSet || waiters.empty); 42 if (isSet) 43 return; 44 45 currentlySet = true; 46 waiters.resumeAll(); 47 report(SyncVerbosityEventType.HazardOff); 48 } 49 50 /** 51 * Reset the event. 52 * 53 * Any future fiber calling wait will block until another call to set. This is the Event default state. 54 */ 55 void reset() nothrow @safe @nogc { 56 DBG_ASSERT!"Event is set but has fibers waiting"(!isSet || waiters.empty); 57 if( currentlySet ) 58 report(SyncVerbosityEventType.HazardOn); 59 currentlySet = false; 60 } 61 62 /// Report the Event's current state. 63 @property bool isSet() const pure nothrow @safe @nogc { 64 return currentlySet; 65 } 66 67 /** 68 * waits for the event to be set 69 * 70 * If the event is already set, returns without sleeping. 71 * 72 * Params: 73 * timeout = sets a timeout for the wait. 74 * 75 * Throws: 76 * TimeoutExpired if the timeout expires. 77 * 78 * Any other exception injected to this fiber using Reactor.throwInFiber 79 */ 80 void wait(Timeout timeout = Timeout.infinite) @safe @nogc { 81 bool reported; 82 83 while( !isSet ) { 84 if( !reported ) { 85 report(SyncVerbosityEventType.Contention); 86 reported = true; 87 } 88 waiters.suspend(timeout); 89 } 90 91 if( reported ) { 92 report(SyncVerbosityEventType.Wakeup); 93 } 94 95 // We might have gone through without sleeping 96 theReactor.assertMayContextSwitch(); 97 } 98 99 /** 100 * waits for the event to be set with potential spurious wakeups 101 * 102 * If the event is already set, returns without sleeping. 103 * 104 * The main difference between this method and wait is that this method supports the case where the struct holding the Event is freed 105 * while the fiber is sleeping. As a result, two main differences are possible: 106 * $(OL 107 * $(LI Spurious wakeups are possible $(LPAREN)i.e. - `unreliableWait` returns, but the event is not set$(RPAREN) ) 108 * $(LI The VerboseEvent will not report when we wake up from the sleep.) ) 109 * 110 * Proper invocation should use the following pattern: 111 * ---- 112 * while( isEventValid && !event.unreliableWait ) {} 113 * ---- 114 * 115 * It should be pointed out that spurious wakeups only happen when another fiber sets the event. This might still be 116 * a spurious because another fiber might reset the event before the waiting fiber gets a chance to run. If this 117 * is a desired behavior, you might wish to check whether `Signal` fits your needs better. 118 * 119 * Params: 120 * timeout = sets a timeout for the wait. 121 * 122 * Returns: 123 * Returns `true` if the event was already set when the call was made. 124 * 125 * Throws: 126 * `TimeoutExpired` if the timeout expires. 127 * 128 * Any other exception injected to this fiber using `Reactor.throwInFiber` 129 */ 130 bool unreliableWait(Timeout timeout = Timeout.infinite) @safe @nogc { 131 if( isSet ) { 132 theReactor.assertMayContextSwitch(); 133 return true; 134 } 135 136 report(SyncVerbosityEventType.Contention); 137 waiters.suspend(timeout); 138 // Will not report wakeup, as we cannot know that the event still exists: report(SyncVerbosityEventType.Wakeup); 139 140 return false; 141 } 142 143 package: 144 void setVerbosityCallback(EventReporter reporter) nothrow @safe @nogc { 145 this.reporter = reporter; 146 } 147 148 void report(SyncVerbosityEventType type) nothrow @safe @nogc { 149 if( reporter !is null ) 150 reporter(type); 151 } 152 } 153 154 /** 155 * A wrapper around Event that adds verbosity to state changes. 156 * 157 * All state changes will be reported (from set to reset and vice versa). Also, a fiber that has to sleep due to the Event not being set will 158 * also be reported. 159 * 160 * Don't forget to call open, or the event will behave as a usual Event. 161 * 162 * Params: 163 * Name = the display name to show for the Event. 164 * ExtraParam = An optional extra type to provide more context for the specific event instance. The data that goes with this type is 165 * provided as an argument to open. 166 */ 167 template VerboseEvent(string Name, ExtraParam = void) { 168 alias VerboseEvent = SyncVerbosity!(Event, Name, ExtraParam); 169 } 170 171 unittest { 172 //import mecca.reactor.fd; 173 import mecca.reactor; 174 175 theReactor.setup(); 176 scope(exit) theReactor.teardown(); 177 178 VerboseEvent!"UT" evt; 179 evt.open(); 180 181 uint counter; 182 uint doneCount; 183 bool done; 184 185 enum NumWaiters = 30; 186 187 void worker() { 188 while(!done) { 189 theReactor.yield(); 190 evt.wait(); 191 counter++; 192 } 193 194 doneCount++; 195 } 196 197 void framework() { 198 uint savedCounter; 199 200 enum Delay = dur!"msecs"(1); 201 foreach(i; 0..10) { 202 INFO!"Reset event"(); 203 evt.reset(); 204 savedCounter = counter; 205 INFO!"Infra begin sleep"(); 206 theReactor.sleep(Delay); 207 INFO!"Infra end sleep"(); 208 assert(savedCounter == counter, "Worker fibers working while event is reset"); 209 210 INFO!"Set event"(); 211 evt.set(); 212 INFO!"Infra begin sleep2"(); 213 theReactor.sleep(Delay); 214 INFO!"Infra end sleep2"(); 215 assert(savedCounter != counter, "Worker fibers not released despite event set"); 216 } 217 218 INFO!"Reset event end"(); 219 evt.reset(); 220 theReactor.yield(); 221 222 assert(doneCount==0, "Worker fibers exit while not done"); 223 done = true; 224 INFO!"Infra begin sleep end"(); 225 theReactor.sleep(Delay); 226 INFO!"Infra end sleep end"(); 227 228 assert(doneCount==0, "Worker fibers exit with event reset"); 229 INFO!"Set event end"(); 230 evt.set(); 231 INFO!"Infra yeild"(); 232 theReactor.yield(); 233 assert(doneCount==NumWaiters, "Not all worker fibers woke up from event"); 234 235 INFO!"Infra done"(); 236 theReactor.stop(); 237 } 238 239 foreach(i; 0..NumWaiters) 240 theReactor.spawnFiber(&worker); 241 242 theReactor.spawnFiber(&framework); 243 244 245 theReactor.start(); 246 } 247 248 /// Edge trigger condition variable supporting multiple waiters. 249 struct Signal { 250 private: 251 FiberQueue waiters; 252 253 public: 254 /** 255 * waits for the event to trigger 256 * 257 * This function is $(B guaranteed) to sleep. 258 * 259 * Params: 260 * timeout = sets a timeout for the wait. 261 * 262 * Throws: 263 * TimeoutExpired if the timeout expires. 264 * 265 * Any other exception injected to this fiber using Reactor.throwInFiber 266 */ 267 void wait(Timeout timeout = Timeout.infinite) @safe @nogc { 268 waiters.suspend(timeout); 269 } 270 271 /** 272 * Wake up all waiting fibers 273 */ 274 void signal() nothrow @safe @nogc { 275 waiters.resumeAll(); 276 } 277 }