1 /// group related fibers so they can all be killed together, if needed
2 module mecca.reactor.fiber_group;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import std.traits;
7 
8 import mecca.containers.lists;
9 import mecca.lib.exception;
10 import mecca.lib.time;
11 import mecca.log;
12 import mecca.reactor;
13 
14 /// group of related fiber that may need to be killed together
15 struct FiberGroup {
16 private:
17     package class FiberGroupExtinction: FiberInterrupt {mixin ExceptionBody!"FiberGroup killer exception";}
18     alias RegisteredFibersList = _LinkedList!(ReactorFiber*, "params.fgChain.next", "params.fgChain.prev", "params.fgChain.owner", false);
19 
20     package struct Chain {
21         ReactorFiber*           prev, next;
22         RegisteredFibersList*   owner;
23     }
24 
25     RegisteredFibersList fibersList;
26     enum State {
27         None,
28         Active,
29         Closing,
30     }
31     State state;
32 
33 public:
34 
35     @disable this(this);
36 
37     ~this() @safe @nogc nothrow {
38         ASSERT!"FiberGroup destructed but is not fully closed"(closed);
39     }
40 
41     /// 3state execution results type
42     struct ExecutionResult(T) {
43         /// Whether execution finished successfully
44         bool completed = false;
45         static if (!is(T == void)) {
46             /// Actual execution result (if type is not void)
47             T result;
48         }
49     }
50 
51     /// Initialize a new fiber group
52     void open() nothrow @safe @nogc {
53         ASSERT!"New FiberGroup has fibers registered"(fibersList.empty);
54         // The following line has a side effect!! "closed" will set the state to None if it is Closing and all fibers
55         // have quit. Since this is an ASSERT (i.e. - does not get compiled away), and since we're overriding the state
56         // at the very next statement, this is not a problem.
57         ASSERT!"Cannot open fiber group that is not closed: Current state %s"(closed, state);
58         state = State.Active;
59     }
60 
61     /// report whether a fiber group is closed
62     ///
63     /// This call will return `false` for half-closed groups.
64     @property closed() pure nothrow @safe @nogc {
65         // If we closed without waiting, we might still be in Closing state.
66         if( state==State.Closing && fibersList.empty )
67             state=State.None;
68 
69         return state == State.None;
70     }
71 
72     /**
73      * close the group (killing all fibers).
74      *
75      * It is legal for the calling thread to be part of the group. If `waitForExit` is true, it will be killed only once
76      * the function is done waiting for the other fibers to exit.
77      *
78      * Params:
79      * waitForExit = Whether the function waits for all fibers to exit before returning.
80      *
81      * Notes:
82      * If waitForExit is false, the group cannot be reused before all fibers have actually exited.
83      */
84     void close(bool waitForExit = true) @safe @nogc {
85         if( state!=State.Active )
86             return;
87 
88         auto cs = theReactor.criticalSection();
89         state = State.Closing;
90 
91         bool suicide;
92 
93         auto ourFiberId = theReactor.currentFiberId;
94         auto killerException = mkEx!FiberGroupExtinction;
95         foreach(fiber; fibersList.range) {
96             if( fiber.identity == ourFiberId ) {
97                 suicide = true;
98                 continue;
99             }
100 
101             // WARN_AS!"Fiber killed by group"(fiber.identity);
102             WARN!"Killing fiber %s"(fiber.identity);
103             theReactor.throwInFiber!FiberGroupExtinction(FiberHandle(fiber));
104         }
105 
106         if( !waitForExit ) {
107             if( suicide ) {
108                 WARN!"Fiber commiting suicid as part of group"();
109                 throw killerException;
110             }
111             return;
112         }
113 
114         cs.leave();
115 
116         if( suicide )
117             removeThisFiber();
118 
119         waitEmpty();
120         DEBUG!"All group fibers are now dead. May they rest in pieces"();
121 
122         state = State.None;
123 
124         if (suicide) {
125             // For consistency, add ourselves back.
126             addThisFiber(true);
127             WARN!"Fiber commiting suicid as part of group"();
128             throw killerException;
129         }
130     }
131 
132     /// Wait for all fibers in a group to exit.
133     ///
134     /// This function does not initiate termination. Use `close` to actually terminate the fibers.
135     void waitEmpty(Timeout timeout = Timeout.infinite) @safe @nogc {
136         // Wait for all fibers to die an agonizing death
137         while( !fibersList.empty ) {
138             theReactor.joinFiber( FiberHandle(fibersList.head), timeout );
139         }
140     }
141 
142     /// Return true if the current fiber is a member of the group
143     bool isCurrentFiberMember() const nothrow @safe @nogc {
144         return theReactor.currentFiberPtr in fibersList;
145     }
146 
147     // DMDBUG https://issues.dlang.org/show_bug.cgi?id=16206
148     // The non-template version must come before the templated version
149     /**
150      * Spawn a new fiber that will be a member of the group.
151      *
152      * Arguments and return value are the same as for theReactor.spawnFiber.
153      */
154     FiberHandle spawnFiber(void delegate() dg) nothrow @safe @nogc {
155         static void wrapper(void delegate() dg) @system {
156             dg();
157         }
158         return spawnFiber!wrapper(dg);
159     }
160 
161     /// ditto
162     FiberHandle spawnFiber(alias F)(ParameterTypeTuple!F args) nothrow @safe @nogc {
163         ASSERT!"FiberGroup state not Active: %s"(state == State.Active, state);
164         alias funcType = typeof(F);
165         auto fib = theReactor.spawnFiber( &fiberWrapper!funcType, &F, &this, args );
166 
167         return fib;
168     }
169 
170     /**
171      * Conditionally spawn a new fiber, only if the fiber group is currently open.
172      *
173      * This function is useful in certain racy cases, where the fiber group has been closed, but has not yet finished closing.
174      *
175      * Params:
176      * dg = the delegate to run inside the fiber
177      *
178      * Returns:
179      * The FiberHandle of the new fiber if successful, the invalid FiberHandle if the fiber group is closed or closing.
180      */
181     FiberHandle spawnFiberIfOpen(void delegate() dg) nothrow @safe @nogc {
182         if( state != State.Active )
183             return FiberHandle.init;
184 
185         return spawnFiber(dg);
186     }
187 
188     /**
189      * Perform a task inside the current fiber as part of the group.
190      *
191      * This function temporarily adds the current fiber to the group for the sake of performing a specific function.
192      * Once that function is done, the fiber leaves the group again.
193      *
194      * If the group is killed while inside this function, the function returns early and the return type has the member
195      * `completed` set to false. If the function ran to completion, `completed` is set to true, and `result` is set to
196      * the function's return value (if one exists).
197      *
198      * If the fiber is already a member of the group when this function is called, the function is simply executed
199      * normally.
200      */
201     @notrace auto runTracked(alias F)(ParameterTypeTuple!F args) {
202         alias R = ReturnType!F;
203         ExecutionResult!R res;
204 
205         if(isCurrentFiberMember()) {
206             // No need to attach again
207             return invoke!F(args);
208         }
209 
210         addThisFiber();
211         bool fiberAdded = true;
212         scope(exit) {
213             if(fiberAdded)
214                 removeThisFiber();
215         }
216 
217         try {
218             res = invoke!F(args);
219         } catch( FiberGroupExtinction ex ) {
220             WARN!"Fiber %s killed in contained context by FiberGroup"(theReactor.currentFiberId);
221             removeThisFiber();
222             fiberAdded = false;
223         }
224 
225         return res;
226     }
227 
228     /// ditto
229     auto runTracked(T)(scope T delegate() dg) {
230         static T wrapper(scope T delegate() dg) {
231             return dg();
232         }
233         return runTracked!(wrapper)(dg);
234     }
235 
236 private:
237     @notrace void addThisFiber(bool duringSuicide = false) nothrow @safe @nogc {
238         ASSERT!"FiberGroup state not Active: %s"(state == State.Active || state==State.None && duringSuicide, state);
239         auto fib = theReactor.currentFiberPtr;
240         DBG_ASSERT!"Trying to add fiber already in group"( fib !in fibersList );
241         DBG_ASSERT!"Trying to add fiber to group which is already member of another group"( fib.params.fgChain.owner is null );
242         fibersList.append(fib);
243     }
244 
245     @notrace void removeThisFiber() nothrow @safe @nogc {
246         ASSERT!"FiberGroup asked to remove fiber which is not a member"(isCurrentFiberMember());
247         fibersList.remove(theReactor.currentFiberPtr);
248     }
249 
250     @notrace static auto invoke(alias F)(ParameterTypeTuple!F args) {
251         alias R = ReturnType!F;
252         ExecutionResult!R res;
253 
254         static if (is(R == void)) {
255             F(args);
256         } else {
257             res.result = F(args);
258         }
259         res.completed = true;
260 
261         return res;
262     }
263 
264     @notrace static void fiberWrapper(T)(T* fn, FiberGroup* fg, ParameterTypeTuple!T args) {
265         if( fg.state!=State.Active ) {
266             WARN!"Fiber group closed before fiber managed to start"();
267             return;
268         }
269 
270         fg.addThisFiber();
271         scope(exit) fg.removeThisFiber();
272         theReactor.setFiberName(theReactor.currentFiberHandle, "FiberGroupMember", fn);
273         fn(args);
274     }
275 }
276 
277 unittest {
278     static int counter;
279     counter = 0;
280 
281     static void fib(int num) {
282         scope(success) assert(false);
283 
284         while (true) {
285             counter += num;
286             theReactor.sleep(msecs(1));
287         }
288     }
289 
290     testWithReactor({
291         FiberGroup tracker;
292 
293         {
294             DEBUG!"#UT Test fibers running and being killed"();
295             tracker.open();
296 
297             tracker.addThisFiber();
298             scope(exit) tracker.removeThisFiber();
299             tracker.spawnFiber!fib(1);
300             tracker.spawnFiber!fib(100);
301             theReactor.sleep(msecs(50));
302             // this fiber won't get to run
303             tracker.spawnFiber!fib(10000);
304 
305             bool caught = false;
306             try {
307                 tracker.close();
308             }
309             catch (tracker.FiberGroupExtinction ex) {
310                 caught = true;
311             }
312 
313             theReactor.sleep(2.msecs);
314 
315             assert(caught, "this fiber did not commit suicide");
316             assert(counter > 0, "no fibers have run");
317             assert(counter < 10000, "third fiber should not have run");
318         }
319 
320         {
321             int counter2 = 0;
322 
323             tracker.open();
324 
325             static class SomeException: Exception {mixin ExceptionBody!"Some exception";}
326 
327             static bool caught = false;
328             try {
329                 tracker.runTracked({
330                     throw mkEx!SomeException;
331                 });
332             } catch (SomeException ex) {
333                 caught = true;
334             }
335             assert(caught, "exception wasn't passed up");
336 
337             tracker.runTracked({
338                 theReactor.registerTimer(Timeout(msecs(20)), (){
339                     theReactor.spawnFiber({
340                         tracker.close();
341                     });
342                 });
343 
344                 scope(success) assert(false);
345 
346                 while (true) {
347                     counter2++;
348                     theReactor.sleep(msecs(1));
349                 }
350             });
351 
352             assert(counter2 > 0, "no fibers have run");
353             assert(counter2 < 10000, "third fiber should not have run");
354         }
355 
356         {
357             // test fiber suicide
358             tracker.open();
359             tracker.spawnFiber({tracker.close();});
360             theReactor.sleep(msecs(1));
361             assert(tracker.closed());
362         }
363     });
364 }
365 /*
366    Make sure nested calls work correctly
367  */
368 unittest {
369     import std.exception;
370     import std.stdio;
371 
372     testWithReactor({
373         FiberGroup foo;
374         foo.open();
375 
376         // Make sure we dont catch the fiberbomb in the nested call
377         auto res1 = foo.runTracked({
378             foo.runTracked({
379                 throw mkEx!(foo.FiberGroupExtinction);
380             });
381             assert(false, "Nested call caught the fiber bomb!");
382         });
383         assert(res1.completed == false, "res2 marked as completed when shouldn't has");
384 
385         // Make sure we dont catch the fiberbomb in a deeply nested call
386         auto res2 = foo.runTracked({
387             foo.runTracked({
388                 foo.runTracked({
389                     foo.runTracked({
390                         foo.runTracked({
391                             throw mkEx!(foo.FiberGroupExtinction);
392                         });
393                         assert(false, "Nested call caught the fiber bomb!");
394                     });
395                     assert(false, "Nested call caught the fiber bomb!");
396                 });
397                 assert(false, "Nested call caught the fiber bomb!");
398             });
399             assert(false, "Nested call caught the fiber bomb!");
400         });
401         assert(res2.completed == false, "res2 marked as completed when shouldn't have");
402 
403         foo.close();
404     });
405 }
406 
407 /*
408     Spawn fibers on inactive trackers
409 */
410 unittest {
411     import std.exception;
412     import std.stdio;
413 
414     testWithReactor({
415         FiberGroup foo;
416 
417         void dlg() {}
418 
419         assert( !foo.spawnFiberIfOpen(&dlg).isValid );
420 
421         foo.open();
422 
423         assert(foo.spawnFiberIfOpen(&dlg).isValid);
424 
425         foo.close();
426 
427         assert( !foo.spawnFiberIfOpen(&dlg).isValid);
428     });
429 }
430 
431 unittest {
432     uint counter = 0;
433     FiberGroup tracker;
434 
435     void fib1() {
436         scope(exit) {
437             counter++;
438         }
439         theReactor.sleep(10.msecs);
440     }
441 
442     testWithReactor(
443     {
444         static void closer(FiberGroup* fg, bool wait) {
445             fg.close(wait);
446         }
447 
448         {
449             tracker.open();
450 
451             tracker.spawnFiber(&fib1);
452             tracker.spawnFiber(&fib1);
453 
454             theReactor.yield();
455 
456             tracker.runTracked!closer(&tracker, true);
457             assertEQ(counter, 2);
458 
459             counter=0;
460             tracker.open();
461 
462             tracker.spawnFiber(&fib1);
463             tracker.spawnFiber(&fib1);
464 
465             theReactor.yield();
466 
467             tracker.runTracked!closer(&tracker, false);
468             assertEQ(counter, 0);
469             theReactor.yield();
470             assertEQ(counter, 2);
471         }
472 
473         tracker.close();
474     });
475 }
476 
477 unittest {
478     META!"Make sure that close(false) works properly"();
479 
480     int counter;
481 
482     void fiberBody() {
483         scope(exit) counter++;
484 
485         theReactor.sleep(10.days);
486     }
487 
488     testWithReactor({
489         FiberGroup group;
490 
491         group.open();
492 
493         group.spawnFiber(&fiberBody);
494         group.spawnFiber(&fiberBody);
495 
496         assertEQ(counter, 0);
497         theReactor.yield();
498 
499         assertEQ(counter, 0);
500         assert(!group.closed);
501 
502         group.close(false);
503 
504         assert(!group.closed);
505         assertThrows!AssertError(group.open());
506 
507         theReactor.yield();
508         assert(group.closed);
509 
510         group.open();
511         group.close();
512     });
513 }