1 /// Fiber queue helper for writing synchronization objects
2 module mecca.reactor.sync.fiber_queue;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import mecca.containers.lists;
7 import mecca.lib.exception;
8 import mecca.lib.time;
9 import mecca.log;
10 import mecca.reactor;
11 
12 /**
13   Implementation of the fiber queue.
14 
15   A fiber queue supports two basic operations: suspend, which causes a fiber to stop execution and wait in the queue, and resume, which wakes
16   up one (or more) suspended fibers.
17 
18   As the name suggests, the queue maintains a strict FIFO order.
19 
20   This should not, typically, be used directly by client code. Instead, it is a helper for developing synchronization objects.
21 
22 Params:
23  Volatile = Sets whether suspend is supported in the case where the fiber queue itself goes out of context before all fibers wake up.
24  */
25 struct FiberQueueImpl(bool Volatile) {
26 private:
27     LinkedListWithOwner!(ReactorFiber*) waitingList;
28 
29 public:
30     @disable this(this);
31 
32     /** Suspends the current fiber until it is awoken.
33         Params:
34             timeout = How long to wait.
35         Throws:
36             TimeoutExpired if the timeout expires.
37 
38             Anything else if someone calls Reactor.throwInFiber
39      */
40     void suspend(Timeout timeout = Timeout.infinite) @safe @nogc {
41         auto ourHandle = theReactor.currentFiberHandle;
42         DBG_ASSERT!"Fiber already has sleep flag when FQ.suspend called"( ! ourHandle.get.flag!"SLEEPING" );
43         bool inserted = waitingList.append(ourHandle.get);
44         DBG_ASSERT!"Fiber %s added to same queue twice"(inserted, ourHandle);
45 
46         ourHandle.get.flag!"SLEEPING" = true;
47         scope(failure) {
48             if( ourHandle.get.flag!"SLEEPING" ) {
49                 ourHandle.get.flag!"SLEEPING" = false;
50             } else {
51                 // We were killed after we were already scheduled to wake up
52                 static if( !Volatile ) {
53                     // Wake up one instead of us. Only do so if there is no chance that the queue itself disappeared
54                     resumeOne();
55                 }
56             }
57         }
58 
59         theReactor.suspendCurrentFiber(timeout);
60 
61         // Since we're the fiber management, the fiber should not be able to exit without going through this point
62         DBG_ASSERT!"Fiber handle for %s became invalid while it slept"(ourHandle.isValid, ourHandle);
63         DBG_ASSERT!"Fiber woke up from sleep without the sleep flag being reset"( ! ourHandle.get.flag!"SLEEPING" );
64 
65         // There are some (perverse) use cases where after wakeup the fiber queue is no longer valid. As such, make sure not to rely on any
66         // member, which is why we disable:
67         // ASSERT!"Fiber %s woken up but not removed from FiberQueue"(ourHandle.get !in waitingList, ourHandle);
68     }
69 
70     static if( !Volatile ) {
71         /** Resumes execution of one fiber.
72          *
73          * Unless there are no viable fibers in the queue, exactly one fiber will be resumed.
74          *
75          * Any fibers with pending exceptions (TimeoutExpired or anything else) do not count as viable, even if they are first in line to be
76          * resumed.
77          *
78          * A volatile FiberQueue cannot provide a reliable resumeOne semantics. In order to not entrap innocent implementers, this method is
79          * only available in the non-volatile version of the queue.
80          *
81          * Params:
82          *    immediate = By default the fiber resumed is appended to the end of the scheduled fibers. Setting immediate to true causes
83          *        it to be scheduled at the beginning of the queue.
84          * Note:
85          *    If the fibers at the head of the queue have pending exceptions, the fiber actually woken might be one that was not in the queue
86          *    when resumeOne was originally called. Most of the time, this is the desired behavior. If not, this might result in a spurious
87          *    wakeup.
88          */
89         FiberHandle resumeOne(bool immediate=false) nothrow @safe @nogc {
90             return internalResumeOne(immediate);
91         }
92     }
93 
94     /** Resumes execution of all pending fibers
95      */
96     void resumeAll() nothrow @safe @nogc {
97         while( !empty ) {
98             internalResumeOne(false);
99         }
100     }
101 
102     /** Reports whether there are pending fibers in the queue.
103         Returns: true if there are no pending fibers.
104      */
105     @property bool empty() const pure nothrow @nogc @safe {
106         return waitingList.empty;
107     }
108 
109 private:
110     FiberHandle internalResumeOne(bool immediate) nothrow @safe @nogc {
111         ReactorFiber* wakeupFiber = waitingList.popHead;
112 
113         if (wakeupFiber is null)
114             return FiberHandle.init;
115 
116         DBG_ASSERT!"FQ Trying to wake up %s which doesn't have SLEEPING set"(wakeupFiber.flag!"SLEEPING", wakeupFiber.identity);
117         wakeupFiber.flag!"SLEEPING" = false;
118         auto handle = FiberHandle( wakeupFiber );
119         theReactor.resumeFiber( handle, immediate );
120 
121         return handle;
122     }
123 }
124 
125 /** A simple type for defining a volatile fiber queue.
126 
127   Please use with extreme care. Writing correct code with a volatile queue is a difficult task. Consider whether you really need it.
128  */
129 alias VolatileFiberQueue = FiberQueueImpl!true;
130 /// A simple type for defining a non-volatile fiber queue.
131 alias FiberQueue = FiberQueueImpl!false;
132 
133 unittest {
134     INFO!"UT fiber queue basic tests"();
135     import std.random;
136 
137     Mt19937 random;
138     random.seed(unpredictableSeed);
139 
140     theReactor.setup();
141     scope(exit) theReactor.teardown();
142 
143     FiberQueue fq;
144     DEBUG!"Fiber queue at %s"( &fq );
145 
146     uint wokeup, timedout;
147 
148     void waiter() {
149         try {
150             Duration waitDuration = dur!"msecs"( uniform!"(]"(20, 200, random) );
151             DEBUG!"Fiber %s waiting for %s"(theReactor.currentFiberHandle, waitDuration.toString);
152             fq.suspend( Timeout(waitDuration) );
153             wokeup++;
154         } catch( TimeoutExpired ex ) {
155             timedout++;
156             DEBUG!"%s timed out"(theReactor.currentFiberHandle);
157         }
158     }
159 
160     enum NumWaiters = 30;
161 
162     void framework() {
163         theReactor.sleep(dur!"msecs"(70));
164 
165         FiberHandle handle;
166         while( !fq.empty ) {
167             handle = fq.resumeOne;
168             assert(handle.isValid);
169             DEBUG!"Woke up %s"(handle);
170         }
171         theReactor.yield;
172 
173         INFO!"Ran test with %s fibers timing out and %s waking up"(timedout, wokeup);
174         assert( timedout + wokeup == NumWaiters, "Incorrect number of fibers finished");
175 
176         theReactor.stop();
177     }
178 
179     foreach(i; 0..NumWaiters)
180         theReactor.spawnFiber(&waiter);
181 
182     theReactor.spawnFiber(&framework);
183 
184     theReactor.start();
185 }
186 
187 unittest {
188     INFO!"UT test fiber exception during wake up"();
189     theReactor.setup();
190     scope(exit) theReactor.teardown();
191 
192     FiberQueue fq;
193     uint finishedCount, exceptionCount;
194 
195     void workerFiber() {
196         try {
197             fq.suspend();
198             finishedCount++;
199         } catch(TimeoutExpired ex) {
200             exceptionCount++;
201         }
202     }
203 
204     void framework() {
205         /* The fibers desired itinerary:
206            We wake up exactly one fiber, so exactly one fiber should wake up.
207            fib1 - enters the FQ, woken up and also killed, counts as exception exit
208            fib2 - enters the FQ, woken up
209            fib3 - killed
210            fib4 - never wakes up
211 
212            All in all, two deaths and one clean exit
213          */
214         FiberHandle fib1 = theReactor.spawnFiber(&workerFiber);
215         theReactor.yield();
216         FiberHandle fib2 = theReactor.spawnFiber(&workerFiber);
217         theReactor.yield();
218         FiberHandle fib3 = theReactor.spawnFiber(&workerFiber);
219         theReactor.yield();
220         FiberHandle fib4 = theReactor.spawnFiber(&workerFiber);
221         theReactor.yield();
222 
223         // Both fibers sleeping
224         fq.resumeOne();
225         theReactor.throwInFiber!TimeoutExpired(fib1);
226         theReactor.throwInFiber!TimeoutExpired(fib3);
227         theReactor.yield();
228         theReactor.yield();
229         theReactor.yield();
230         theReactor.yield();
231 
232         theReactor.stop();
233     }
234 
235     theReactor.spawnFiber(&framework);
236     theReactor.start();
237 
238     INFO!"finished count %s exception count %s"(finishedCount, exceptionCount);
239     ASSERT!"Finished count not 1: %s"(finishedCount==1, finishedCount);
240     ASSERT!"Exception count not 2: %s"(exceptionCount==2, exceptionCount);
241 }