1 /// Fiber queue helper for writing synchronization objects 2 module mecca.reactor.sync.fiber_queue; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import mecca.containers.lists; 7 import mecca.lib.exception; 8 import mecca.lib.time; 9 import mecca.log; 10 import mecca.reactor; 11 12 /** 13 Implementation of the fiber queue. 14 15 A fiber queue supports two basic operations: suspend, which causes a fiber to stop execution and wait in the queue, and resume, which wakes 16 up one (or more) suspended fibers. 17 18 As the name suggests, the queue maintains a strict FIFO order. 19 20 This should not, typically, be used directly by client code. Instead, it is a helper for developing synchronization objects. 21 22 Params: 23 Volatile = Sets whether suspend is supported in the case where the fiber queue itself goes out of context before all fibers wake up. 24 */ 25 struct FiberQueueImpl(bool Volatile) { 26 private: 27 LinkedListWithOwner!(ReactorFiber*) waitingList; 28 29 public: 30 @disable this(this); 31 32 /** Suspends the current fiber until it is awoken. 33 Params: 34 timeout = How long to wait. 35 Throws: 36 TimeoutExpired if the timeout expires. 37 38 Anything else if someone calls Reactor.throwInFiber 39 */ 40 void suspend(Timeout timeout = Timeout.infinite) @safe @nogc { 41 auto ourHandle = theReactor.currentFiberHandle; 42 DBG_ASSERT!"Fiber already has sleep flag when FQ.suspend called"( ! ourHandle.get.flag!"SLEEPING" ); 43 bool inserted = waitingList.append(ourHandle.get); 44 DBG_ASSERT!"Fiber %s added to same queue twice"(inserted, ourHandle); 45 46 ourHandle.get.flag!"SLEEPING" = true; 47 scope(failure) { 48 if( ourHandle.get.flag!"SLEEPING" ) { 49 ourHandle.get.flag!"SLEEPING" = false; 50 } else { 51 // We were killed after we were already scheduled to wake up 52 static if( !Volatile ) { 53 // Wake up one instead of us. Only do so if there is no chance that the queue itself disappeared 54 resumeOne(); 55 } 56 } 57 } 58 59 theReactor.suspendCurrentFiber(timeout); 60 61 // Since we're the fiber management, the fiber should not be able to exit without going through this point 62 DBG_ASSERT!"Fiber handle for %s became invalid while it slept"(ourHandle.isValid, ourHandle); 63 DBG_ASSERT!"Fiber woke up from sleep without the sleep flag being reset"( ! ourHandle.get.flag!"SLEEPING" ); 64 65 // There are some (perverse) use cases where after wakeup the fiber queue is no longer valid. As such, make sure not to rely on any 66 // member, which is why we disable: 67 // ASSERT!"Fiber %s woken up but not removed from FiberQueue"(ourHandle.get !in waitingList, ourHandle); 68 } 69 70 static if( !Volatile ) { 71 /** Resumes execution of one fiber. 72 * 73 * Unless there are no viable fibers in the queue, exactly one fiber will be resumed. 74 * 75 * Any fibers with pending exceptions (TimeoutExpired or anything else) do not count as viable, even if they are first in line to be 76 * resumed. 77 * 78 * A volatile FiberQueue cannot provide a reliable resumeOne semantics. In order to not entrap innocent implementers, this method is 79 * only available in the non-volatile version of the queue. 80 * 81 * Params: 82 * immediate = By default the fiber resumed is appended to the end of the scheduled fibers. Setting immediate to true causes 83 * it to be scheduled at the beginning of the queue. 84 * Note: 85 * If the fibers at the head of the queue have pending exceptions, the fiber actually woken might be one that was not in the queue 86 * when resumeOne was originally called. Most of the time, this is the desired behavior. If not, this might result in a spurious 87 * wakeup. 88 */ 89 FiberHandle resumeOne(bool immediate=false) nothrow @safe @nogc { 90 return internalResumeOne(immediate); 91 } 92 } 93 94 /** Resumes execution of all pending fibers 95 */ 96 void resumeAll() nothrow @safe @nogc { 97 while( !empty ) { 98 internalResumeOne(false); 99 } 100 } 101 102 /** Reports whether there are pending fibers in the queue. 103 Returns: true if there are no pending fibers. 104 */ 105 @property bool empty() const pure nothrow @nogc @safe { 106 return waitingList.empty; 107 } 108 109 private: 110 FiberHandle internalResumeOne(bool immediate) nothrow @safe @nogc { 111 ReactorFiber* wakeupFiber = waitingList.popHead; 112 113 if (wakeupFiber is null) 114 return FiberHandle.init; 115 116 DBG_ASSERT!"FQ Trying to wake up %s which doesn't have SLEEPING set"(wakeupFiber.flag!"SLEEPING", wakeupFiber.identity); 117 wakeupFiber.flag!"SLEEPING" = false; 118 auto handle = FiberHandle( wakeupFiber ); 119 theReactor.resumeFiber( handle, immediate ); 120 121 return handle; 122 } 123 } 124 125 /** A simple type for defining a volatile fiber queue. 126 127 Please use with extreme care. Writing correct code with a volatile queue is a difficult task. Consider whether you really need it. 128 */ 129 alias VolatileFiberQueue = FiberQueueImpl!true; 130 /// A simple type for defining a non-volatile fiber queue. 131 alias FiberQueue = FiberQueueImpl!false; 132 133 unittest { 134 INFO!"UT fiber queue basic tests"(); 135 import std.random; 136 137 Mt19937 random; 138 random.seed(unpredictableSeed); 139 140 theReactor.setup(); 141 scope(exit) theReactor.teardown(); 142 143 FiberQueue fq; 144 DEBUG!"Fiber queue at %s"( &fq ); 145 146 uint wokeup, timedout; 147 148 void waiter() { 149 try { 150 Duration waitDuration = dur!"msecs"( uniform!"(]"(20, 200, random) ); 151 DEBUG!"Fiber %s waiting for %s"(theReactor.currentFiberHandle, waitDuration.toString); 152 fq.suspend( Timeout(waitDuration) ); 153 wokeup++; 154 } catch( TimeoutExpired ex ) { 155 timedout++; 156 DEBUG!"%s timed out"(theReactor.currentFiberHandle); 157 } 158 } 159 160 enum NumWaiters = 30; 161 162 void framework() { 163 theReactor.sleep(dur!"msecs"(70)); 164 165 FiberHandle handle; 166 while( !fq.empty ) { 167 handle = fq.resumeOne; 168 assert(handle.isValid); 169 DEBUG!"Woke up %s"(handle); 170 } 171 theReactor.yield; 172 173 INFO!"Ran test with %s fibers timing out and %s waking up"(timedout, wokeup); 174 assert( timedout + wokeup == NumWaiters, "Incorrect number of fibers finished"); 175 176 theReactor.stop(); 177 } 178 179 foreach(i; 0..NumWaiters) 180 theReactor.spawnFiber(&waiter); 181 182 theReactor.spawnFiber(&framework); 183 184 theReactor.start(); 185 } 186 187 unittest { 188 INFO!"UT test fiber exception during wake up"(); 189 theReactor.setup(); 190 scope(exit) theReactor.teardown(); 191 192 FiberQueue fq; 193 uint finishedCount, exceptionCount; 194 195 void workerFiber() { 196 try { 197 fq.suspend(); 198 finishedCount++; 199 } catch(TimeoutExpired ex) { 200 exceptionCount++; 201 } 202 } 203 204 void framework() { 205 /* The fibers desired itinerary: 206 We wake up exactly one fiber, so exactly one fiber should wake up. 207 fib1 - enters the FQ, woken up and also killed, counts as exception exit 208 fib2 - enters the FQ, woken up 209 fib3 - killed 210 fib4 - never wakes up 211 212 All in all, two deaths and one clean exit 213 */ 214 FiberHandle fib1 = theReactor.spawnFiber(&workerFiber); 215 theReactor.yield(); 216 FiberHandle fib2 = theReactor.spawnFiber(&workerFiber); 217 theReactor.yield(); 218 FiberHandle fib3 = theReactor.spawnFiber(&workerFiber); 219 theReactor.yield(); 220 FiberHandle fib4 = theReactor.spawnFiber(&workerFiber); 221 theReactor.yield(); 222 223 // Both fibers sleeping 224 fq.resumeOne(); 225 theReactor.throwInFiber!TimeoutExpired(fib1); 226 theReactor.throwInFiber!TimeoutExpired(fib3); 227 theReactor.yield(); 228 theReactor.yield(); 229 theReactor.yield(); 230 theReactor.yield(); 231 232 theReactor.stop(); 233 } 234 235 theReactor.spawnFiber(&framework); 236 theReactor.start(); 237 238 INFO!"finished count %s exception count %s"(finishedCount, exceptionCount); 239 ASSERT!"Finished count not 1: %s"(finishedCount==1, finishedCount); 240 ASSERT!"Exception count not 2: %s"(exceptionCount==2, exceptionCount); 241 }