1 /// Define the micro-threading reactor
2 module mecca.reactor;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 static import posix_signal = core.sys.posix.signal;
7 static import posix_time = core.sys.posix.time;
8 import core.memory: GC;
9 import core.sys.posix.signal;
10 import core.sys.posix.sys.mman: munmap, mprotect, PROT_NONE;
11 import core.thread: thread_isMainThread;
12 
13 import std.exception;
14 import std..string;
15 import std.traits;
16 
17 import mecca.containers.arrays;
18 import mecca.containers.lists;
19 import mecca.containers.pools;
20 import mecca.lib.concurrency;
21 import mecca.lib.consts;
22 import mecca.lib.exception;
23 import mecca.lib.integers : bitsInValue, createBitMask;
24 import mecca.lib.memory;
25 import mecca.lib.reflection;
26 import mecca.lib.time;
27 import mecca.lib.time_queue;
28 import mecca.lib.typedid;
29 import mecca.log;
30 import mecca.log.impl;
31 import mecca.platform.os;
32 import mecca.reactor.fiber_group;
33 import mecca.reactor.fls;
34 import mecca.reactor.impl.fibril: Fibril;
35 import mecca.reactor.subsystems.threading;
36 import mecca.reactor.sync.event: Signal;
37 public import mecca.reactor.types;
38 
39 import std.stdio;
40 
41 /// Handle for manipulating registered timers.
42 alias TimerHandle = Reactor.TimerHandle;
43 alias FiberIncarnation = ushort;
44 
45 // The slot number in the stacks for the fiber
46 private alias FiberIdx = AlgebraicTypedIdentifier!("FiberIdx", ushort, ushort.max, ushort.max);
47 
48 /// Track the fiber's state
49 enum FiberState : ubyte {
50     None,       /// Fiber isn't running
51     Starting,   /// Fiber was spawned, but have not yet started running
52     Scheduled,  /// Fiber is waiting to run
53     Running,    /// Fiber is running
54     Sleeping,   /// Fiber is currently suspended
55     Done,       /// Fiber has finished running
56 }
57 
58 private {
59     extern(C) void* _d_eh_swapContext(void* newContext) nothrow @nogc;
60     extern(C) void* _d_eh_swapContextDwarf(void* newContext) nothrow @nogc;
61 }
62 
63 struct ReactorFiber {
64     // Prevent accidental copying
65     @disable this(this);
66 
67     static struct OnStackParams {
68         Closure                 fiberBody;
69         union {
70             DRuntimeStackDescriptor  _stackDescriptor;
71             DRuntimeStackDescriptor* stackDescriptorPtr;
72         }
73         FiberGroup.Chain        fgChain;
74         FLSArea                 flsBlock;
75         ExcBuf                  currExcBuf;
76         LogsFiberSavedContext   logsSavedContext;
77         string                  fiberName;              // String identifying what this fiber is doing
78         void*                   fiberPtr;               // The same with a numerical value
79         Signal                  joinWaiters;
80     }
81     enum Flags: ubyte {
82         // XXX Do we need CALLBACK_SET?
83         CALLBACK_SET   = 0x01,  /// Has callback set
84         SPECIAL        = 0x02,  /// Special fiber. Not a normal user fiber
85         MAIN           = 0x04,  /// This is the main fiber (i.e. - the "not a fiber")
86         SCHEDULED      = 0x08,  /// Fiber currently scheduled to be run
87         SLEEPING       = 0x10,  /// Fiber is sleeping on a sync object
88         HAS_EXCEPTION  = 0x20,  /// Fiber has pending exception to be thrown in it
89         EXCEPTION_BT   = 0x40,  /// Fiber exception needs to have fiber's backtrace
90         PRIORITY       = 0x80,  /// Fiber is a high priority one
91     }
92 
93 align(1):
94     Fibril                                      fibril;
95     OnStackParams*                              params;
96     LinkedListWithOwner!(ReactorFiber*)*        _owner;
97     FiberIdx                                    _nextId;
98     FiberIdx                                    _prevId;
99     FiberIncarnation                            incarnationCounter;
100     ubyte                                       _flags;
101     FiberState                                  _state;
102     static extern(C) void* function(void*) @nogc nothrow _swapEhContext = &swapEhContextChooser;
103 
104     // We define this struct align(1) for the sole purpose of making the following static assert verify what it's supposed to
105     static assert (this.sizeof == 32);  // keep it small and cache-line friendly
106 
107     // LinkedQueue access through property
108     @property ReactorFiber* _next() const nothrow @safe @nogc {
109         return to!(ReactorFiber*)(_nextId);
110     }
111 
112     @property void _next(FiberIdx newNext) nothrow @safe @nogc {
113         _nextId = newNext;
114     }
115 
116     @property void _next(ReactorFiber* newNext) nothrow @safe @nogc {
117         _nextId = to!FiberIdx(newNext);
118     }
119 
120     @property ReactorFiber* _prev() const nothrow @safe @nogc {
121         return to!(ReactorFiber*)(_prevId);
122     }
123 
124     @property void _prev(FiberIdx newPrev) nothrow @safe @nogc {
125         _prevId = newPrev;
126     }
127 
128     @property void _prev(ReactorFiber* newPrev) nothrow @safe @nogc {
129         _prevId = to!FiberIdx(newPrev);
130     }
131 
132     @notrace void setup(void[] stackArea, bool main) nothrow @nogc {
133         _flags = 0;
134 
135         if( !main )
136             fibril.set(stackArea[0 .. $ - OnStackParams.sizeof], &wrapper);
137         else
138             flag!"MAIN" = true;
139 
140         params = cast(OnStackParams*)&stackArea[$ - OnStackParams.sizeof];
141         setToInit(params);
142 
143         if( !main ) {
144             params._stackDescriptor.bstack = stackArea.ptr + stackArea.length; // Include params, as FLS is stored there
145             params._stackDescriptor.tstack = fibril.rsp;
146             params._stackDescriptor.add();
147         } else {
148             import core.thread: Thread;
149             params.stackDescriptorPtr = cast(DRuntimeStackDescriptor*)accessMember!("m_curr")(Thread.getThis());
150             DBG_ASSERT!"MAIN not set on main fiber"( flag!"MAIN" );
151         }
152 
153         _next = null;
154         incarnationCounter = 0;
155     }
156 
157     @notrace void teardown() nothrow @nogc {
158         fibril.reset();
159         if (!flag!"MAIN") {
160             params._stackDescriptor.remove();
161         }
162         params = null;
163     }
164 
165     @notrace void switchTo(ReactorFiber* next) nothrow @trusted @nogc {
166         pragma(inline, true);
167         import core.thread: Thread;
168 
169         DRuntimeStackDescriptor* currentSD = stackDescriptor;
170         DRuntimeStackDescriptor* nextSD = next.stackDescriptor;
171 
172         currentSD.ehContext = _swapEhContext(nextSD.ehContext);
173 
174         // Since druntime does not expose the interfaces needed for switching fibers, we need to hack around the
175         // protection system to access Thread.m_curr, which is private.
176         DRuntimeStackDescriptor** threadCurrentSD = cast(DRuntimeStackDescriptor**)&accessMember!("m_curr")(Thread.getThis());
177         *threadCurrentSD = nextSD;
178 
179         fibril.switchTo(next.fibril, &currentSD.tstack);
180     }
181 
182     @notrace @property private DRuntimeStackDescriptor* stackDescriptor() @trusted @nogc nothrow {
183         if( !flag!"MAIN" ) {
184             return &params._stackDescriptor;
185         } else {
186             return params.stackDescriptorPtr;
187         }
188     }
189 
190     @property FiberId identity() const nothrow @safe @nogc {
191         FiberId.UnderlyingType res;
192         res = to!FiberIdx(&this).value;
193         res |= incarnationCounter << theReactor.maxNumFibersBits;
194 
195         return FiberId(res);
196     }
197 
198     @property bool flag(string NAME)() const pure nothrow @safe @nogc {
199         return (_flags & __traits(getMember, Flags, NAME)) != 0;
200     }
201     @property void flag(string NAME)(bool value) pure nothrow @safe @nogc {
202         if (value) {
203             _flags |= __traits(getMember, Flags, NAME);
204         }
205         else {
206             import mecca.lib.integers: bitComplement;
207             _flags &= bitComplement(__traits(getMember, Flags, NAME));
208         }
209     }
210 
211     @property FiberState state() pure const nothrow @safe @nogc {
212         return _state;
213     }
214 
215     @property void state(FiberState newState) nothrow @safe @nogc {
216         DBG_ASSERT!"%s trying to switch state from %s to %s, but histogram claims no fibers in this state. %s"(
217                 theReactor.stats.fibersHistogram[_state]>0, identity, _state, newState,
218                 theReactor.stats.fibersHistogram );
219 
220         theReactor.stats.fibersHistogram[_state]--;
221         theReactor.stats.fibersHistogram[newState]++;
222 
223         _state = newState;
224     }
225 
226     @property bool isAlive() const pure nothrow @safe @nogc {
227         with(FiberState) switch( state ) {
228         case None:
229             return false;
230         case Done:
231             assert(false, "Fiber is in an invalid state Done");
232         default:
233             return true;
234         }
235     }
236 
237 private:
238     @notrace void wrapper() nothrow {
239         bool skipBody;
240         Throwable ex = null;
241 
242         try {
243             // Nobody else will run it the first time the fiber is started. This results in some unfortunate
244             // code duplication
245             switchInto();
246         } catch (FiberInterrupt ex2) {
247             INFO!"Fiber %s killed by FiberInterrupt exception: %s"(identity, ex2.msg);
248             skipBody = true;
249         } catch(Throwable ex2) {
250             ex = ex2;
251             skipBody = true;
252         }
253 
254         while (true) {
255             DBG_ASSERT!"skipBody=false with pending exception"(ex is null || skipBody);
256             scope(exit) ex = null;
257 
258             if( !skipBody ) {
259                 params.logsSavedContext = LogsFiberSavedContext.init;
260                 INFO!"Fiber %s started generation %s flags=0x%0x"(identity, incarnationCounter, _flags);
261 
262                 ASSERT!"Reactor's current fiber isn't the running fiber" (theReactor.thisFiber is &this);
263                 ASSERT!"Fiber %s is in state %s instead of Running" (state == FiberState.Running, identity, state);
264 
265                 try {
266                     // Perform the actual fiber callback
267                     params.fiberBody();
268                 } catch (FiberInterrupt ex2) {
269                     INFO!"Fiber %s killed by FiberInterrupt exception: %s"(identity, ex2.msg);
270                 } catch (Throwable ex2) {
271                     ex = ex2;
272                 }
273             }
274 
275             ASSERT!"Fiber still member of fiber group at termination" (params.fgChain.owner is null);
276 
277             params.joinWaiters.signal();
278             if( ex is null ) {
279                 INFO!"Fiber %s finished"(identity);
280             } else {
281                 ERROR!"Fiber %s finished with exception: %s"(identity, ex.msg);
282                 LOG_EXCEPTION(ex);
283             }
284 
285             params.fiberBody.clear();
286             params.fiberName = null;
287             params.fiberPtr = null;
288             flag!"CALLBACK_SET" = false;
289             ASSERT!"Fiber %s state is %s instead of Running at end of execution"(
290                     state == FiberState.Running, identity, state);
291             state = FiberState.Done;
292             incarnationCounter++;
293             if (ex !is null) {
294                 theReactor.forwardExceptionToMain(ex);
295                 assert(false);
296             } else {
297                 skipBody = theReactor.fiberTerminated();
298             }
299         }
300     }
301 
302     void switchInto() @safe @nogc {
303         // We might have a cross fiber hook. If we do, we need to repeat this part several times
304         bool switched = false;
305         do {
306             switchCurrExcBuf( &params.currExcBuf );
307             if (!flag!"SPECIAL") {
308                 params.flsBlock.switchTo();
309             } else {
310                 FLSArea.switchToNone();
311             }
312             logSwitchInto();
313 
314             if( theReactor.crossFiberHook !is null ) {
315                 theReactor.performCrossFiberHook();
316             } else {
317                 switched = true;
318             }
319         } while(!switched);
320 
321         if (flag!"HAS_EXCEPTION") {
322             Throwable ex = params.currExcBuf.get();
323             ASSERT!"Fiber has exception, but exception is null"(ex !is null);
324             if (flag!"EXCEPTION_BT") {
325                 params.currExcBuf.setTraceback(ex);
326                 flag!"EXCEPTION_BT" = false;
327             }
328 
329             flag!"HAS_EXCEPTION" = false;
330             throw ex;
331         }
332     }
333 
334     void logSwitchInto() nothrow @safe @nogc{
335         logSwitchFiber(&params.logsSavedContext, cast( Parameters!logSwitchFiber[1] )identity.value);
336     }
337 
338 private:
339     static extern(C) void* swapEhContextChooser(void * newContext) @nogc nothrow @notrace {
340         DBG_ASSERT!"Context is not null on first invocation"(newContext is null);
341         void* std = _d_eh_swapContext(newContext);
342         void* dwarf = _d_eh_swapContextDwarf(newContext);
343 
344         if( std !is null ) {
345             _swapEhContext = &_d_eh_swapContext;
346             return std;
347         } else if( dwarf !is null ) {
348             _swapEhContext = &_d_eh_swapContextDwarf;
349             return dwarf;
350         }
351 
352         // Cannot tell which is correct yet
353         return null;
354     }
355 }
356 
357 
358 /**
359   A handle to a running fiber.
360 
361   This handle expires automatically when the fiber stops running. Unless you know, semantically, that a fiber is still running, don't assume
362   there is a running fiber attached to this handle.
363 
364   The handle will correctly claim invalidity even if a new fiber is launched with the same FiberId.
365  */
366 struct FiberHandle {
367 private:
368     FiberId identity;
369     FiberIncarnation incarnation;
370 
371 public:
372     /// Returns whether the handle was set
373     ///
374     /// Unlike `isValid`, this does not check whether the handle is still valid. It only returns whether the handle
375     /// is in initialized state.
376     @property bool isSet() pure const nothrow @safe @nogc {
377         return identity.isValid;
378     }
379 
380     /// Returns whether the handle currently describes a running fiber.
381     @property bool isValid() const nothrow @safe @nogc {
382         return get() !is null;
383     }
384 
385     /// the `FiberId` described by the handle. If the handle is no longer valid, will return FiberId.invalid
386     ///
387     /// Use `getFiberId` if you want the `FiberId` of a no-longer valid handle.
388     @property FiberId fiberId() const nothrow @safe @nogc {
389         if( isValid )
390             return identity;
391 
392         return FiberId.invalid;
393     }
394 
395     /// returns the original `FiberId` set for the handle, whether still valid or not
396     FiberId getFiberId() const nothrow @safe @nogc pure {
397         return identity;
398     }
399 
400     /// Reset the handle to uninitialized state
401     @notrace void reset() nothrow @safe @nogc {
402         this = FiberHandle.init;
403     }
404 
405 package:
406     this(ReactorFiber* fib) nothrow @safe @nogc {
407         opAssign(fib);
408     }
409 
410     auto ref opAssign(ReactorFiber* fib) nothrow @safe @nogc {
411         if (fib) {
412             identity = fib.identity;
413             incarnation = fib.incarnationCounter;
414         }
415         else {
416             identity = FiberId.invalid;
417         }
418         return this;
419     }
420 
421     ReactorFiber* get() const nothrow @safe @nogc {
422         if (!theReactor.isRunning)
423             return null;
424 
425         if (!identity.isValid) {
426             return null;
427         }
428 
429         ReactorFiber* fiber = &theReactor.allFibers[to!FiberIdx(identity).value];
430 
431         DBG_ASSERT!"Fiber state is transient state Done"(fiber.state != FiberState.Done);
432         if(fiber.state == FiberState.None || fiber.incarnationCounter != incarnation) {
433             return null;
434         }
435 
436         return fiber;
437     }
438 }
439 
440 /**
441   The main scheduler for the micro-threading architecture.
442  */
443 struct Reactor {
444     // Prevent accidental copying
445     @disable this(this);
446 
447     /// Delegates passed to `registerIdleCallback` must be of this signature
448     alias IdleCallbackDlg = bool delegate(Duration);
449 
450     /// The options control aspects of the reactor's operation
451     struct OpenOptions {
452         /// Maximum number of fibers.
453         ushort   numFibers = 256;
454         /// Stack size of each fiber (except the main fiber). The reactor will allocate numFiber*fiberStackSize during startup
455         size_t   fiberStackSize = 32*KB;
456         /**
457           How often does the GC's collection run.
458 
459           The reactor uses unconditional periodic collection, rather than lazy evaluation one employed by the default GC
460           settings. This setting sets how often the collection cycle should run. See `gcRunThreshold` for how to not
461           run the GC collection when not needed.
462          */
463         Duration gcInterval = 30.seconds;
464 
465         /**
466          * Allocation threshold to trigger a GC run
467          *
468          * If the amount of memory allocated since the previous GC run is less than this amount of bytes, the GC scan
469          * will be skipped.
470          *
471          * Setting this value to 0 forces a run every `gcInterval`, regardless of how much was allocated.
472          */
473         size_t gcRunThreshold = 16*MB;
474 
475         /**
476           Base granularity of the reactor's timer.
477 
478           Any scheduled task is scheduled at no better accuracy than timerGranularity. $(B In addition to) the fact that
479           a timer task may be delayed. As such, with a 1ms granularity, a task scheduled for 1.5ms from now is the same
480           as a task scheduled for 2ms from now, which may run 3ms from now.
481          */
482         Duration timerGranularity = 1.msecs;
483         /**
484           Hogger detection threshold.
485 
486           A hogger is a fiber that does not release the CPU to run other tasks for a long period of time. Often, this is
487           a result of a bug (i.e. - calling the OS's `sleep` instead of the reactor's).
488 
489           Hogger detection works by measuring how long each fiber took until it allows switching away. If the fiber took
490           more than hoggerWarningThreshold, a warning is logged.
491          */
492         Duration hoggerWarningThreshold = 200.msecs;
493         /**
494          Maximum desired fiber run time
495 
496          A fiber should aim not to run more than this much time without a voluntary context switch. This value affects
497          the `shouldYield` and `considerYield` calls.
498          */
499         Duration maxDesiredRunTime = 150.msecs;
500         /**
501           Hard hang detection.
502 
503           This is a similar safeguard to that used by the hogger detection. If activated (disabled by default), it
504           premptively prompts the reactor every set time to see whether fibers are still being switched in/out. If it
505           finds that the same fiber is running, without switching out, for too long, it terminates the entire program.
506          */
507         Duration hangDetectorTimeout = Duration.zero;
508         /**
509           Whether to enable fault handlers
510 
511           The fault handlers catch program faults (such as segmentation faults) and log them with their backtrace.
512          */
513         bool faultHandlersEnabled = true;
514 
515         /// Maximal number of timers that can be simultaneously registered.
516         size_t   numTimers = 10_000;
517 
518         /// Number of threads servicing deferred tasks
519         uint numThreadsInPool = 4;
520         /// Worker thread stack size
521         size_t threadStackSize = 512*KB;
522 
523         /// Whether we have enabled deferToThread
524         bool threadDeferralEnabled;
525 
526         /**
527           Whether the reactor should register the default (fd processing) idle handler
528 
529           If this value is set to `false`, the poller is still opened. It's idle function would not be automatically
530           called, however, so file operations might block indefinitely unless another mechanism (such as timer based)
531           is put in place to call it periodically.
532 
533           The non-registered idle handler can be manually triggered by calling `poller.poll`.
534          */
535         bool registerDefaultIdler = true;
536 
537         version(unittest) {
538             /// Disable all GC collection during the reactor run time. Only available for UTs.
539             bool utGcDisabled;
540         } else {
541             private enum utGcDisabled = false;
542         }
543     }
544 
545     /// Used by `reportStats` to report statistics about the reactor
546     struct Stats {
547         ulong[FiberState.max+1] fibersHistogram;        /// Total number of user fibers in each `FiberState`
548         ulong numContextSwitches;                       /// Number of time we switched between fibers
549         ulong idleCycles;                               /// Total number of idle cycles
550 
551         /// Returns number of currently used fibers
552         @property ulong numUsedFibers() pure const nothrow @safe @nogc {
553             ulong ret;
554 
555             with(FiberState) {
556                 foreach( state; [Starting, Scheduled, Running, Sleeping]) {
557                     ret += fibersHistogram[state];
558                 }
559             }
560 
561             return ret;
562         }
563 
564         /// Returns number of fibers available to be run
565         @property ulong numFreeFibers() pure const nothrow @safe @nogc {
566             ulong ret;
567 
568             with(FiberState) {
569                 foreach( state; [None]) {
570                     ret += fibersHistogram[state];
571                 }
572 
573                 DBG_ASSERT!"%s fibers in Done state. Should be 0"(fibersHistogram[Done] == 0, fibersHistogram[Done]);
574             }
575 
576             return ret;
577         }
578     }
579 
580 private:
581     enum MAX_IDLE_CALLBACKS = 16;
582     enum TIMER_NUM_BINS = 256;
583     enum TIMER_NUM_LEVELS = 4;
584     enum MAX_DEFERRED_TASKS = 1024;
585     version(unittest) package enum UT_MAX_DEFERRED_TASKS = MAX_DEFERRED_TASKS;
586 
587     enum GUARD_ZONE_SIZE = SYS_PAGE_SIZE;
588 
589     enum NUM_SPECIAL_FIBERS = 2;
590     enum ZERO_DURATION = Duration.zero;
591 
592     enum MainFiberId = FiberId(0);
593     enum IdleFiberId = FiberId(1);
594 
595     bool _open;
596     bool _running;
597     bool _stopping;
598     bool _gcCollectionNeeded;
599     bool _gcCollectionForce;
600     bool _hangDetectorEnabled;
601     ubyte maxNumFibersBits;     // Number of bits sufficient to represent the maximal number of fibers
602     bool nothingScheduled; // true if there is no scheduled fiber
603     enum HIGH_PRIORITY_SCHEDULES_RATIO = 2; // How many high priority schedules before shceduling a low priority fiber
604     ubyte highPrioritySchedules; // Number of high priority schedules done
605     FiberIdx.UnderlyingType maxNumFibersMask;
606     int reactorReturn;
607     int criticalSectionNesting;
608     OpenOptions optionsInEffect;
609     Stats stats;
610     GC.Stats lastGCStats;
611 
612     MmapBuffer fiberStacks;
613     MmapArray!ReactorFiber allFibers;
614     LinkedQueueWithLength!(ReactorFiber*) freeFibers;
615     enum FiberPriorities { NORMAL, HIGH, IMMEDIATE };
616     LinkedListWithOwner!(ReactorFiber*) scheduledFibersNormal, scheduledFibersHigh, scheduledFibersImmediate;
617 
618     ReactorFiber* _thisFiber;
619     ReactorFiber* mainFiber;
620     ReactorFiber* idleFiber;
621     FixedArray!(IdleCallbackDlg, MAX_IDLE_CALLBACKS) idleCallbacks;
622     // Point to idleCallbacks as a range, in case it gets full and we need to spill over to GC allocation
623     IdleCallbackDlg[] actualIdleCallbacks;
624     __gshared Timer hangDetectorTimer;
625 
626     SignalHandlerValue!TscTimePoint fiberRunStartTime;
627     void delegate() nothrow @nogc @safe crossFiberHook;
628     FiberHandle crossFiberHookCaller;   // FiberHandle to return to after performing the hook
629 
630     alias TimedCallbackGeneration = TypedIdentifier!("TimedCallbackGeneration", ulong, ulong.max, ulong.max);
631     struct TimedCallback {
632         TimedCallback* _next, _prev;
633         timeQueue.OwnerAttrType _owner;
634         TimedCallbackGeneration generation;
635         TscTimePoint timePoint;
636         ulong intervalCycles; // How many cycles between repeatetions. Zero means non-repeating
637 
638         Closure closure;
639     }
640 
641     // TODO change to mmap pool or something
642     SimplePool!(TimedCallback) timedCallbacksPool;
643     TimedCallbackGeneration.Allocator timedCallbackGeneration;
644     CascadingTimeQueue!(TimedCallback*, TIMER_NUM_BINS, TIMER_NUM_LEVELS, true) timeQueue;
645 
646     ThreadPool!MAX_DEFERRED_TASKS threadPool;
647     version(unittest) package ref auto utThreadPool() inout { return threadPool; }
648 
649 public:
650     /// Report whether the reactor has been properly opened (i.e. - setup has been called).
651     @property bool isOpen() const pure nothrow @safe @nogc {
652         return _open;
653     }
654 
655     /// Report whether the reactor is currently active
656     ///
657     /// Unlike `isRunning`, this will return `false` during the reactor shutdown.
658     @property bool isActive() const pure nothrow @safe @nogc {
659         return _running && !_stopping;
660     }
661 
662     /// Report whether the reactor is currently running
663     @property bool isRunning() const pure nothrow @safe @nogc {
664         return _running;
665     }
666 
667     /**
668       Set the reactor up for doing work.
669 
670       All options must be set before calling this function.
671      */
672     void setup(OpenOptions options = OpenOptions.init) {
673         INFO!"Setting up reactor"();
674 
675         assert (!isOpen, "reactor.setup called twice");
676         _open = true;
677         assert (thread_isMainThread);
678         assert (options.numFibers > NUM_SPECIAL_FIBERS);
679         reactorReturn = 0;
680         optionsInEffect = options;
681 
682         maxNumFibersBits = bitsInValue(optionsInEffect.numFibers - 1);
683         maxNumFibersMask = createBitMask!(FiberIdx.UnderlyingType)(maxNumFibersBits);
684 
685         stats = Stats.init;
686         stats.fibersHistogram[FiberState.None] = options.numFibers;
687 
688         const stackPerFib = (((options.fiberStackSize + SYS_PAGE_SIZE - 1) / SYS_PAGE_SIZE) + 1) * SYS_PAGE_SIZE;
689         fiberStacks.allocate(stackPerFib * options.numFibers);
690         allFibers.allocate(options.numFibers);
691 
692         _thisFiber = null;
693         criticalSectionNesting = 0;
694         idleCallbacks.length = 0;
695         actualIdleCallbacks = null;
696 
697         foreach(i, ref fib; allFibers) {
698             auto stack = fiberStacks[i * stackPerFib .. (i + 1) * stackPerFib];
699             errnoEnforce(mprotect(stack.ptr, GUARD_ZONE_SIZE, PROT_NONE) == 0, "Guard zone protection failed");
700             fib.setup(stack[GUARD_ZONE_SIZE .. $], i==0);
701 
702             if (i >= NUM_SPECIAL_FIBERS) {
703                 freeFibers.append(&fib);
704             }
705         }
706 
707         mainFiber = &allFibers[MainFiberId.value];
708         mainFiber.flag!"SPECIAL" = true;
709         mainFiber.flag!"CALLBACK_SET" = true;
710         mainFiber.state = FiberState.Running;
711         setFiberName(mainFiber, "mainFiber", &mainloop);
712 
713         idleFiber = &allFibers[IdleFiberId.value];
714         idleFiber.flag!"SPECIAL" = true;
715         idleFiber.flag!"CALLBACK_SET" = true;
716         idleFiber.params.fiberBody.set(&idleLoop);
717         idleFiber.state = FiberState.Sleeping;
718         setFiberName(idleFiber, "idleFiber", &idleLoop);
719 
720         timedCallbacksPool.open(options.numTimers, true);
721         timeQueue.open(options.timerGranularity);
722 
723         if( options.faultHandlersEnabled )
724             registerFaultHandlers();
725 
726         if( options.threadDeferralEnabled )
727             threadPool.open(options.numThreadsInPool, options.threadStackSize);
728 
729         import mecca.reactor.io.fd;
730         _openReactorEpoll();
731 
732         if(options.registerDefaultIdler) {
733             import mecca.reactor.subsystems.poller;
734             theReactor.registerIdleCallback(&poller.reactorIdle);
735         }
736 
737         import mecca.reactor.io.signals;
738         reactorSignal._open();
739 
740         enum TIMER_GRANULARITY = 4; // Number of wakeups during the monitored period
741         Duration threshold = optionsInEffect.hangDetectorTimeout / TIMER_GRANULARITY;
742         hangDetectorTimer = Timer(threshold, &hangDetectorHandler);
743     }
744 
745     /**
746       Shut the reactor down.
747      */
748     void teardown() {
749         INFO!"Tearing down reactor"();
750 
751         ASSERT!"reactor teardown called on non-open reactor"(isOpen);
752         ASSERT!"reactor teardown called on still running reactor"(!isRunning);
753         ASSERT!"reactor teardown called inside a critical section"(criticalSectionNesting==0);
754 
755         import mecca.reactor.io.signals;
756         reactorSignal._close();
757 
758         import mecca.reactor.io.fd;
759         _closeReactorEpoll();
760 
761         foreach(i, ref fib; allFibers) {
762             fib.teardown();
763         }
764 
765         if( optionsInEffect.threadDeferralEnabled )
766             threadPool.close();
767         switchCurrExcBuf(null);
768 
769         // disableGCTracking();
770 
771         if( optionsInEffect.faultHandlersEnabled )
772             deregisterFaultHandlers();
773 
774         allFibers.free();
775         fiberStacks.free();
776         timeQueue.close();
777         timedCallbacksPool.close();
778 
779         setToInit(freeFibers);
780         setToInit(scheduledFibersNormal);
781         setToInit(scheduledFibersHigh);
782         setToInit(scheduledFibersImmediate);
783         nothingScheduled = true;
784 
785         _thisFiber = null;
786         mainFiber = null;
787         idleFiber = null;
788         idleCallbacks.length = 0;
789         actualIdleCallbacks = null;
790 
791         _open = false;
792     }
793 
794     /**
795       Register an idle handler callback.
796 
797       The reactor handles scheduling and fibers switching, but has no built-in mechanism for scheduling sleeping fibers
798       back to execution (except fibers sleeping on a timer, of course). Mechanisms such as file descriptor watching are
799       external, and are registered using this function.
800 
801       The idler callback should receive a timeout variable. This indicates the time until the closest timer expires. If
802       no other event comes in, the idler should strive to wake up after that long. Waking up sooner will, likely, cause
803       the idler to be called again. Waking up later will delay timer tasks (which is allowed by the API contract).
804 
805       It is allowed to register more than one idler callback. Doing so, however, will cause $(B all) of them to be
806       called with a timeout of zero (i.e. - don't sleep), resulting in a busy wait for events.
807 
808       The idler is expected to return a boolean value indicating whether the time spent inside the idler is to be
809       counted as idle time, or whether that's considered "work". This affects the results returned by the `idleCycles`
810       field returned by the `reactorStats`.
811      */
812     void registerIdleCallback(IdleCallbackDlg dg) nothrow @safe {
813         // You will notice our deliberate lack of function to unregister
814         if( actualIdleCallbacks.length==idleCallbacks.capacity ) {
815             WARN!"Idle callbacks capacity reached - switching to GC allocated list"();
816         }
817 
818         if( actualIdleCallbacks.length<idleCallbacks.capacity ) {
819             idleCallbacks ~= dg;
820             actualIdleCallbacks = idleCallbacks[];
821         } else {
822             actualIdleCallbacks ~= dg;
823         }
824 
825         DEBUG!"%s idle callbacks registered"(actualIdleCallbacks.length);
826     }
827 
828     /**
829      * Spawn a new fiber for execution.
830      *
831      * Parameters:
832      *  The first argument must be the function/delegate to call inside the new fiber. If said callable accepts further
833      *  arguments, then they must be provided as further arguments to spawnFiber.
834      *
835      * Returns:
836      *  A FiberHandle to the newly created fiber.
837      */
838     @notrace FiberHandle spawnFiber(T...)(T args) nothrow @safe @nogc {
839         static assert(T.length>=1, "Must pass at least the function/delegate to spawnFiber");
840         static assert(isDelegate!(T[0]) || isFunctionPointer!(T[0]),
841                 "spawnFiber first argument must be function or delegate");
842         static assert( is( ReturnType!(T[0]) == void ), "spawnFiber callback must be of type void" );
843         auto fib = _spawnFiber(false);
844         fib.params.fiberBody.set(args);
845         setFiberName(fib, "Fiber", args[0]);
846         return FiberHandle(fib);
847     }
848 
849     /**
850      * Spawn a new fiber for execution.
851      *
852      * Params:
853      *  F = The function or delegate to call inside the fiber.
854      *  args = The arguments for F
855      *
856      * Returns:
857      *  A FiberHandle to the newly created fiber.
858      */
859     @notrace FiberHandle spawnFiber(alias F)(Parameters!F args)
860     if (!isType!F) {
861         static assert( is( ReturnType!F == void ), "spawnFiber callback must be of type void" );
862         auto fib = _spawnFiber(false);
863         import std.algorithm: move;
864         // pragma(msg, genMoveArgument( args.length, "fib.params.fiberBody.set!F", "args" ) );
865         mixin( genMoveArgument( args.length, "fib.params.fiberBody.set!F", "args" ) );
866 
867         setFiberName(fib, F.mangleof, &F);
868         return FiberHandle(fib);
869     }
870 
871     /// Returns whether currently running fiber is the idle fiber.
872     @property bool isIdle() pure const nothrow @safe @nogc {
873         return thisFiber is idleFiber;
874     }
875 
876     /// Returns whether currently running fiber is the main fiber.
877     @property bool isMain() pure const nothrow @safe @nogc {
878         return thisFiber is mainFiber;
879     }
880 
881     /// Returns whether currently running fiber is a special (i.e. - non-user) fiber
882     @property bool isSpecialFiber() const nothrow @safe @nogc {
883         return thisFiber.flag!"SPECIAL";
884     }
885 
886     /// Returns a FiberHandle to the currently running fiber
887     @property FiberHandle currentFiberHandle() nothrow @safe @nogc {
888         // XXX This assert may be incorrect, but it is easier to remove an assert than to add one
889         DBG_ASSERT!"Should not blindly get fiber handle of special fiber %s"(!isSpecialFiber, currentFiberId);
890         return FiberHandle(thisFiber);
891     }
892     @property package ReactorFiber* currentFiberPtr() nothrow @safe @nogc {
893         return getCurrentFiberPtr(false);
894     }
895     @notrace private ReactorFiber* getCurrentFiberPtr(bool specialOkay) nothrow @safe @nogc {
896         // XXX This assert may be incorrect, but it is easier to remove an assert than to add one
897         ASSERT!"Should not blindly get fiber handle of special fibers %s"(specialOkay || !isSpecialFiber, currentFiberId);
898         return thisFiber;
899     }
900     /**
901       Returns the FiberId of the currently running fiber.
902 
903       You should almost never store the FiberId for later comparison or pass it to another fiber. Doing so risks having the current fiber
904       die and another one spawned with the same FiberId. If that's what you want to do, use currentFiberHandle instead.
905      */
906     @property FiberId currentFiberId() const nothrow @safe @nogc {
907         return thisFiber.identity;
908     }
909 
910     /// Returns the `FiberState` of the specified fiber.
911     FiberState getFiberState(FiberHandle fh) const nothrow @safe @nogc {
912         auto fiber = fh.get();
913 
914         if( fiber is null )
915             return FiberState.None;
916 
917         if( fiber.state==FiberState.Sleeping && fiber.flag!"SCHEDULED" )
918             return FiberState.Scheduled;
919 
920         return fiber.state;
921     }
922 
923     /**
924       Starts up the reactor.
925 
926       The reactor should already have at least one user fiber at that point, or it will start, but sit there and do nothing.
927 
928       This function "returns" only after the reactor is stopped and no more fibers are running.
929 
930       Returns:
931       The return value from `start` is the value passed to the `stop` function.
932      */
933     int start() {
934         _isReactorThread = true;
935         scope(exit) _isReactorThread = false;
936 
937         META!"Reactor started"();
938         assert( idleFiber !is null, "Reactor started without calling \"setup\" first" );
939         mainloop();
940         META!"Reactor stopped"();
941 
942         return reactorReturn;
943     }
944 
945     /**
946       Stop the reactor, killing all fibers.
947 
948       This will kill all running fibers and trigger a return from the original call to Reactor.start.
949 
950       Typically, this function call never returns (throws ReactorExit). However, if stop is called while already in the
951       process of stopping, it will just return. It is, therefor, not wise to rely on that fact.
952 
953       Params:
954       reactorReturn = The return value to be returned from `start`
955      */
956     void stop(int reactorReturn = 0) @safe @nogc {
957         if( !isActive ) {
958             ERROR!"Reactor.stop called, but reactor is not running"();
959             return;
960         }
961 
962         INFO!"Stopping reactor with exit code %s"(reactorReturn);
963         this.reactorReturn = reactorReturn;
964 
965         _stopping = true;
966         if( !isMain() ) {
967             resumeSpecialFiber(mainFiber);
968         }
969 
970         throw mkEx!ReactorExit();
971     }
972 
973     /**
974       enter a no-fiber switch piece of code.
975 
976       If the code tries to switch away from the current fiber before leaveCriticalSection is called, the reactor will throw an assert.
977       This helps writing code that does interruption sensitive tasks without locks and making sure future changes don't break it.
978 
979       Critical sections are nestable.
980 
981       Also see `criticalSection` below.
982      */
983     void enterCriticalSection() nothrow @safe @nogc {
984         pragma(inline, true);
985         criticalSectionNesting++;
986     }
987 
988     /// leave the innermost critical section.
989     void leaveCriticalSection() nothrow @safe @nogc {
990         pragma(inline, true);
991         assert (criticalSectionNesting > 0);
992         criticalSectionNesting--;
993     }
994 
995     /// Reports whether execution is currently within a critical section
996     @property bool isInCriticalSection() const pure nothrow @safe @nogc {
997         return criticalSectionNesting > 0;
998     }
999 
1000     /** Make sure we are allowed to context switch from this point.
1001      *
1002      * This will be called automatically if an actual context switch is attempted. You might wish to call this function
1003      * explicitly, however, from contexts that $(I might) context switch, so that they fail even if they don't actually
1004      * attempt it, in accordance with the "fail early" doctrine.
1005      */
1006     void assertMayContextSwitch(string message = "Simulated context switch") nothrow @safe @nogc {
1007         pragma(inline, true);
1008         ASSERT!"Context switch from outside the reactor thread: %s"(isReactorThread, message);
1009         ASSERT!"Context switch while inside a critical section: %s"(!isInCriticalSection, message);
1010     }
1011 
1012     /**
1013      * Return a RAII object handling a critical section
1014      *
1015      * The function enters a critical section. Unlike enterCriticalSection, however, there is no need to explicitly call
1016      * leaveCriticalSection. Instead, the function returns a variable that calls leaveCriticalSection when it goes out of scope.
1017      *
1018      * There are two advantages for using criticalSection over enter/leaveCriticalSection. The first is for nicer scoping:
1019      * ---
1020      * with(theReactor.criticalSection) {
1021      *   // Code in critical section goes here
1022      * }
1023      * ---
1024      *
1025      * The second case is if you need to switch, within the same code, between critical section and none:
1026      * ---
1027      * auto cs = theReactor.criticalSection;
1028      *
1029      * // Some code
1030      * cs.leave();
1031      * // Some code that sleeps
1032      * cs.enter();
1033      *
1034      * // The rest of the critical section code
1035      * ---
1036      *
1037      * The main advantage over enter/leaveCriticalSection is that this way, the critical section never leaks. Any exception thrown, either
1038      * from the code inside the critical section or the code temporary out of it, will result in the correct number of leaveCriticalSection
1039      * calls to zero out the effect.
1040      *
1041      * Also note that there is no need to call leave at the end of the code. Cs's destructor will do it for us $(B if necessary).
1042      */
1043     @property auto criticalSection() nothrow @safe @nogc {
1044         pragma(inline, true);
1045         static struct CriticalSection {
1046         private:
1047             bool owner = true;
1048 
1049         public:
1050             @disable this(this);
1051             ~this() nothrow @safe @nogc {
1052                 if( owner )
1053                     leave();
1054             }
1055 
1056             void enter() nothrow @safe @nogc {
1057                 DBG_ASSERT!"The same critical section was activated twice"(!owner);
1058                 theReactor.enterCriticalSection();
1059                 owner = true;
1060             }
1061 
1062             void leave() nothrow @safe @nogc {
1063                 DBG_ASSERT!"Asked to leave critical section not owned by us"(owner);
1064                 theReactor.leaveCriticalSection();
1065                 owner = false;
1066             }
1067         }
1068         enterCriticalSection();
1069         return CriticalSection();
1070     }
1071     unittest {
1072         testWithReactor({
1073                 {
1074                     with( theReactor.criticalSection ) {
1075                         assertThrows!AssertError( theReactor.yield() );
1076                     }
1077                 }
1078 
1079                 theReactor.yield();
1080                 });
1081     }
1082 
1083     /**
1084      * Temporarily surrender the CPU for other fibers to run.
1085      *
1086      * Unlike suspend, the current fiber will automatically resume running after any currently scheduled fibers are finished.
1087      */
1088     @notrace void yield() @safe @nogc {
1089         // TODO both scheduled and running is not a desired state to be in other than here.
1090         resumeFiber(thisFiber);
1091         suspendCurrentFiber();
1092     }
1093 
1094     /**
1095      * Returns whether the fiber is already running for a long time.
1096      *
1097      * Fibers that run for too long prevent other fibers from operating properly. On the other hand, fibers that initiate
1098      * a context switch needlessly load the system with overhead.
1099      *
1100      * This function reports whether the fiber is already running more than the desired time.
1101      *
1102      * The base time used is taken from `OpenOptions.maxDesiredRunTime`.
1103      *
1104      * Params:
1105      * tolerance = A multiplier for the amount of acceptable run time. Specifying 4 here will give you 4 times as much
1106      * time to run before a context switch is deemed necessary.
1107      */
1108     @notrace bool shouldYield(uint tolerance = 1) const nothrow @safe @nogc {
1109         if( _stopping )
1110             return true;
1111 
1112         auto now = TscTimePoint.hardNow();
1113         return (now - fiberRunStartTime) > tolerance*optionsInEffect.maxDesiredRunTime;
1114     }
1115 
1116     /**
1117      Perform yield if fiber is running long enough.
1118 
1119      Arguments and meaning are the same as for `shouldYield`.
1120 
1121      Returns:
1122      Whether a yield actually took place.
1123      */
1124     @notrace bool considerYield(uint tolerance = 1) @safe @nogc {
1125         if( shouldYield(tolerance) ) {
1126             yield();
1127             return true;
1128         }
1129 
1130         return false;
1131     }
1132 
1133     /**
1134      * Give a fiber temporary priority in execution.
1135      *
1136      * Setting fiber priority means that the next time this fiber is scheduled, it will be scheduled ahead of other
1137      * fibers already scheduled to be run.
1138      *
1139      * Returns a RAII object that sets the priority back when destroyed. Typically, that happens at the end of the scope.
1140      *
1141      * Can be used as:
1142      * ---
1143      * with(boostFiberPriority()) {
1144      *    // High priority stuff goes here
1145      * }
1146      * ---
1147      */
1148     auto boostFiberPriority() nothrow @safe @nogc {
1149         static struct FiberPriorityRAII {
1150         private:
1151             FiberHandle fh;
1152 
1153         public:
1154             @disable this(this);
1155             this(FiberHandle fh) @safe @nogc nothrow {
1156                 this.fh = fh;
1157             }
1158 
1159             ~this() @safe @nogc nothrow {
1160                 if( fh.isValid ) {
1161                     ASSERT!"Cannot move priority watcher between fibers (opened on %s)"(
1162                             fh == theReactor.currentFiberHandle, fh.fiberId);
1163 
1164                     ASSERT!"Trying to reset priority which is not set"( theReactor.thisFiber.flag!"PRIORITY" );
1165                     theReactor.thisFiber.flag!"PRIORITY" = false;
1166                 }
1167             }
1168         }
1169 
1170         ASSERT!"Cannot ask to prioritize a non-user fiber"(!isSpecialFiber);
1171         ASSERT!"Asked to prioritize a fiber %s which is already high priority"(
1172                 !thisFiber.flag!"PRIORITY", currentFiberId );
1173         DEBUG!"Setting fiber priority"();
1174         thisFiber.flag!"PRIORITY" = true;
1175 
1176         return FiberPriorityRAII(currentFiberHandle);
1177     }
1178 
1179     /// Handle used to manage registered timers
1180     struct TimerHandle {
1181     private:
1182         TimedCallback* callback;
1183         TimedCallbackGeneration generation;
1184 
1185     public:
1186 
1187         this(TimedCallback* callback) nothrow @safe @nogc {
1188             DBG_ASSERT!"Constructing TimedHandle from null callback"(callback !is null);
1189             this.callback = callback;
1190             this.generation = callback.generation;
1191         }
1192 
1193         /// Returns whether the handle was used
1194         ///
1195         /// returns:
1196         /// `true` if the handle was set. `false` if it is still in its init value.
1197         @property bool isSet() const pure nothrow @safe @nogc {
1198             return callback !is null;
1199         }
1200 
1201         /// Returns whether the handle describes a currently registered task
1202         @property bool isValid() const nothrow @safe @nogc {
1203             return
1204                     isSet() &&
1205                     theReactor.isOpen &&
1206                     callback._owner !is null &&
1207                     generation == callback.generation;
1208         }
1209 
1210         /// Revert the handle to init value, forgetting the timer it points to
1211         ///
1212         /// This call will $(B not) cancel the actual timer. Use `cancelTimer` for that.
1213         @notrace void reset() nothrow @safe @nogc {
1214             callback = null;
1215             generation = TimedCallbackGeneration.invalid;
1216         }
1217 
1218         /// Cancel a currently registered timer
1219         void cancelTimer() nothrow @safe @nogc {
1220             if( isValid ) {
1221                 theReactor.cancelTimerInternal(this);
1222             }
1223 
1224             reset();
1225         }
1226     }
1227 
1228     /**
1229      * Registers a timer task.
1230      *
1231      * Params:
1232      * F = the callable to be invoked when the timer expires
1233      * timeout = when the timer is be called
1234      * params = the parameters to call F with
1235      *
1236      * Returns:
1237      * A handle to the just registered timer.
1238      */
1239     TimerHandle registerTimer(alias F)(Timeout timeout, Parameters!F params) nothrow @safe @nogc {
1240         TimedCallback* callback = allocTimedCallback();
1241         callback.closure.set(&F, params);
1242         callback.timePoint = timeout.expiry;
1243         callback.intervalCycles = 0;
1244 
1245         timeQueue.insert(callback);
1246 
1247         return TimerHandle(callback);
1248     }
1249 
1250     /// ditto
1251     TimerHandle registerTimer(alias F)(Duration timeout, Parameters!F params) nothrow @safe @nogc {
1252         return registerTimer!F(Timeout(timeout), params);
1253     }
1254 
1255     /**
1256      * Register a timer callback
1257      *
1258      * Same as `registerTimer!F`, except with the callback as an argument. Callback cannot accept parameters.
1259      */
1260     TimerHandle registerTimer(T)(Timeout timeout, T dg) nothrow @safe @nogc {
1261         TimedCallback* callback = allocTimedCallback();
1262         callback.closure.set(dg);
1263         callback.timePoint = timeout.expiry;
1264         callback.intervalCycles = 0;
1265 
1266         timeQueue.insert(callback);
1267 
1268         return TimerHandle(callback);
1269     }
1270 
1271     /// ditto
1272     TimerHandle registerTimer(T)(Duration timeout, T dg) nothrow @safe @nogc {
1273         return registerTimer!T(Timeout(timeout), dg);
1274     }
1275 
1276     /**
1277      * registers a timer that will repeatedly trigger at set intervals.
1278      *
1279      * You do not control precisely when the callback is invoked, only how often. The invocations are going to be evenly
1280      * spaced out (best effort), but the first invocation might be almost immediately after the call or a whole
1281      * `interval` after.
1282      *
1283      * You can use the `firstRun` argument to control when the first invocation is going to be (but the same rule will
1284      * still apply to the second one).
1285      *
1286      * Params:
1287      *  interval = the frequency with which the callback will be called.
1288      *  dg = the callback to invoke
1289      *  F = an alias to the function to be called
1290      *  params = the arguments to pass to F on each invocation
1291      *  firstRun = if supplied, directly sets when is the first time the timer shall run. The value does not have any
1292      *      special constraints.
1293      */
1294     TimerHandle registerRecurringTimer(Duration interval, void delegate() dg) nothrow @safe @nogc {
1295         TimedCallback* callback = _registerRecurringTimer(interval);
1296         callback.closure.set(dg);
1297         return TimerHandle(callback);
1298     }
1299 
1300     /// ditto
1301     TimerHandle registerRecurringTimer(alias F)(Duration interval, Parameters!F params) nothrow @safe @nogc {
1302         TimedCallback* callback = _registerRecurringTimer(interval);
1303         callback.closure.set(&F, params);
1304         return TimerHandle(callback);
1305     }
1306 
1307     /// ditto
1308     TimerHandle registerRecurringTimer(Duration interval, void delegate() dg, Timeout firstRun) nothrow @safe @nogc {
1309         TimedCallback* callback = allocRecurringTimer(interval);
1310         callback.timePoint = firstRun.expiry;
1311         callback.closure.set(dg);
1312 
1313         timeQueue.insert(callback);
1314 
1315         return TimerHandle(callback);
1316     }
1317 
1318     /// ditto
1319     TimerHandle registerRecurringTimer(alias F)(Duration interval, Parameters!F params, Timeout firstRun) nothrow @safe @nogc {
1320         TimedCallback* callback = allocRecurringTimer(interval);
1321         callback.timePoint = firstRun.expiry;
1322         callback.closure.set(&F, params);
1323 
1324         timeQueue.insert(callback);
1325 
1326         return TimerHandle(callback);
1327     }
1328 
1329     private void cancelTimerInternal(TimerHandle handle) nothrow @safe @nogc {
1330         DBG_ASSERT!"cancelTimerInternal called with invalid handle"( handle.isValid );
1331         timeQueue.cancel(handle.callback);
1332         timedCallbacksPool.release(handle.callback);
1333     }
1334 
1335     /**
1336      * Schedule a callback for out of bounds immediate execution.
1337      *
1338      * For all intents and purposes, `call` is identical to `registerTimer` with an expired timeout.
1339      */
1340     TimerHandle call(alias F)(Parameters!F params) nothrow @safe @nogc {
1341         return registerTimer!F(Timeout.elapsed, params);
1342     }
1343 
1344     /// ditto
1345     TimerHandle call(T)(T dg) nothrow @safe @nogc {
1346         return registerTimer(Timeout.elapsed, dg);
1347     }
1348 
1349     /// Suspend the current fiber for a specified amount of time
1350     void sleep(Duration duration) @safe @nogc {
1351         sleep(Timeout(duration));
1352     }
1353 
1354     /// ditto
1355     void sleep(Timeout until) @safe @nogc {
1356         assert(until != Timeout.init, "sleep argument uninitialized");
1357         auto timerHandle = registerTimer!resumeFiber(until, currentFiberHandle, false);
1358         scope(failure) timerHandle.cancelTimer();
1359 
1360         suspendCurrentFiber();
1361     }
1362 
1363     /**
1364       Resume a suspended fiber
1365 
1366       You are heartily encouraged not to use this function directly. In almost all cases, it is better to use one of the
1367       synchronization primitives instead.
1368 
1369       Params:
1370       handle = the handle of the fiber to be resumed.
1371       priority = If true, schedule the fiber before all currently scheduled fibers. If the fiber is already scheduled
1372         (`getFiberState` returns `Scheduled`), this will move it to the top of the queue.
1373      */
1374     @notrace void resumeFiber(FiberHandle handle, bool priority = false) nothrow @safe @nogc {
1375         auto fiber = handle.get();
1376 
1377         if( fiber !is null )
1378             resumeFiber(handle.get(), priority);
1379     }
1380 
1381     /**
1382       Suspend the current fiber
1383 
1384       If `timeout` is given and expires, the suspend will throw a `TimeoutExpired` exception.
1385 
1386       You are heartily encouraged not to use this function directly. In almost all cases, it is better to use one of the
1387       synchronization primitives instead.
1388      */
1389     @notrace void suspendCurrentFiber(Timeout timeout) @trusted @nogc {
1390         if (timeout == Timeout.infinite)
1391             return suspendCurrentFiber();
1392 
1393         assertMayContextSwitch("suspendCurrentFiber");
1394 
1395         TimerHandle timeoutHandle;
1396         scope(exit) timeoutHandle.cancelTimer();
1397         bool timeoutExpired;
1398 
1399         if (timeout == Timeout.elapsed) {
1400             throw mkEx!TimeoutExpired;
1401         }
1402 
1403         static void resumer(FiberHandle fibHandle, TimerHandle* cookie, bool* timeoutExpired) nothrow @safe @nogc{
1404             *cookie = TimerHandle.init;
1405             ReactorFiber* fib = fibHandle.get;
1406             assert( fib !is null, "Fiber disappeared while suspended with timer" );
1407 
1408             // Throw TimeoutExpired only if we're the ones who resumed the fiber. this prevents a race when
1409             // someone else had already woken the fiber, but it just didn't get time to run while the timer expired.
1410             // this probably indicates fibers hogging the CPU for too long (starving others)
1411             *timeoutExpired = ! fib.flag!"SCHEDULED";
1412 
1413             /+
1414                     if (! *timeoutExpired)
1415                         fib.WARN_AS!"#REACTOR fiber resumer invoked, but fiber already scheduled (starvation): %s scheduled, %s pending"(
1416                                 theReactor.scheduledFibers.length, theReactor.pendingFibers.length);
1417             +/
1418 
1419             theReactor.resumeFiber(fib);
1420         }
1421 
1422         timeoutHandle = registerTimer!resumer(timeout, currentFiberHandle, &timeoutHandle, &timeoutExpired);
1423         switchToNext();
1424 
1425         if( timeoutExpired )
1426             throw mkEx!TimeoutExpired();
1427     }
1428 
1429     /// ditto
1430     @notrace void suspendCurrentFiber() @safe @nogc {
1431          switchToNext();
1432     }
1433 
1434     /**
1435      * run a function inside a different thread.
1436      *
1437      * Runs a function inside a thread. Use this function to run CPU bound processing without freezing the other fibers.
1438      *
1439      * `Fini`, if provided, will be called with the same parameters in the reactor thread once the thread has finished.
1440      * `Fini` will be called unconditionally, even if the fiber that launched the thread terminates before the thread
1441      * has finished.
1442      *
1443      * The `Fini` function runs within a `criticalSection`, and must not sleep.
1444      *
1445      * Returns:
1446      * The return argument from the delegate is returned by this function.
1447      *
1448      * Throws:
1449      * Will throw TimeoutExpired if the timeout expires.
1450      *
1451      * Will rethrow whatever the thread throws in the waiting fiber.
1452      */
1453     auto deferToThread(alias F, alias Fini = null)(Parameters!F args, Timeout timeout = Timeout.infinite) @nogc {
1454         DBG_ASSERT!"deferToThread called but thread deferral isn't enabled in the reactor"(
1455                 optionsInEffect.threadDeferralEnabled);
1456         return threadPool.deferToThread!(F, Fini)(timeout, args);
1457     }
1458 
1459     /// ditto
1460     auto deferToThread(F)(scope F dlg, Timeout timeout = Timeout.infinite) @nogc {
1461         DBG_ASSERT!"deferToThread called but thread deferral isn't enabled in the reactor"(optionsInEffect.threadDeferralEnabled);
1462         static auto glueFunction(F dlg) {
1463             return dlg();
1464         }
1465 
1466         return threadPool.deferToThread!glueFunction(timeout, dlg);
1467     }
1468 
1469     /**
1470      * forward an exception to another fiber
1471      *
1472      * This function throws an exception in another fiber. The fiber will be scheduled to run.
1473      *
1474      * There is a difference in semantics between the two forms. The first form throws an identical copy of the exception in the
1475      * target fiber. The second form forms a new exception to be thrown in that fiber.
1476      *
1477      * One place where this difference matters is with the stack trace the thrown exception will have. With the first form, the
1478      * stack trace will be the stack trace `ex` has, wherever it is (usually not in the fiber in which the exception was thrown).
1479      * With the second form, the stack trace will be of the target fiber, which means it will point back to where the fiber went
1480      * to sleep.
1481      */
1482     bool throwInFiber(FiberHandle fHandle, Throwable ex) nothrow @safe @nogc {
1483         if( !fHandle.isValid )
1484             return false;
1485 
1486         ExcBuf* fiberEx = prepThrowInFiber(fHandle, false);
1487 
1488         if( fiberEx is null )
1489             return false;
1490 
1491         fiberEx.set(ex);
1492         auto fib = fHandle.get();
1493         resumeFiber(fib, true);
1494         return true;
1495     }
1496 
1497     /// ditto
1498     bool throwInFiber(T : Throwable, string file = __FILE_FULL_PATH__, size_t line = __LINE__, A...)
1499             (FiberHandle fHandle, auto ref A args) nothrow @safe @nogc
1500     {
1501         pragma(inline, true);
1502         if( !fHandle.isValid )
1503             return false;
1504 
1505         ExcBuf* fiberEx = prepThrowInFiber(fHandle, true);
1506 
1507         if( fiberEx is null )
1508             return false;
1509 
1510         fiberEx.construct!T(file, line, false, args);
1511         auto fib = fHandle.get();
1512         resumeFiber(fib, true);
1513         return true;
1514     }
1515 
1516     /** Request that a GC collection take place ASAP
1517      *  Params:
1518      *  waitForCollection = whether to wait for the collection to finish before returning.
1519      */
1520     void requestGCCollection(bool waitForCollection = true) @safe @nogc {
1521         if( theReactor._stopping )
1522             return;
1523 
1524         DEBUG!"Requesting explicit GC collection"();
1525         requestGCCollectionInternal(true);
1526 
1527         if( waitForCollection ) {
1528             DBG_ASSERT!"Special fiber must request synchronous GC collection"( !isSpecialFiber );
1529             yield();
1530         }
1531     }
1532 
1533     private void requestGCCollectionInternal(bool force) nothrow @safe @nogc {
1534         _gcCollectionNeeded = true;
1535         _gcCollectionForce = force;
1536         if( theReactor.currentFiberId != MainFiberId ) {
1537             theReactor.resumeSpecialFiber(theReactor.mainFiber);
1538         }
1539     }
1540 
1541     /// Property for disabling/enabling the hang detector.
1542     ///
1543     /// The hang detector must be configured during `setup` by setting `OpenOptions.hangDetectorTimeout`.
1544     @property bool hangDetectorEnabled() pure const nothrow @safe @nogc {
1545         return _hangDetectorEnabled;
1546     }
1547 
1548     /// ditto
1549     @property void hangDetectorEnabled(bool enabled) pure nothrow @safe @nogc {
1550         ASSERT!"Cannot enable an unconfigured hang detector"(
1551                 !enabled || optionsInEffect.hangDetectorTimeout !is Duration.zero);
1552         _hangDetectorEnabled = enabled;
1553     }
1554 
1555     /// Iterate all fibers
1556     auto iterateFibers() const nothrow @safe @nogc {
1557         static struct FibersIterator {
1558         private:
1559             uint numLivingFibers;
1560             FiberIdx idx = NUM_SPECIAL_FIBERS;
1561             ReturnType!(Reactor.criticalSection) criticalSection;
1562 
1563             this(uint numFibers) {
1564                 numLivingFibers = numFibers;
1565                 this.criticalSection = theReactor.criticalSection();
1566 
1567                 if( numLivingFibers>0 ) {
1568                     findNextFiber();
1569                 }
1570             }
1571 
1572             void findNextFiber() @safe @nogc nothrow {
1573                 while( ! to!(ReactorFiber*)(idx).isAlive ) {
1574                     idx++;
1575                     DBG_ASSERT!"Asked for next living fiber but none was found"(
1576                             idx<theReactor.optionsInEffect.numFibers);
1577                 }
1578             }
1579         public:
1580             @property bool empty() const pure @safe @nogc nothrow {
1581                 return numLivingFibers==0;
1582             }
1583 
1584             void popFront() @safe @nogc nothrow {
1585                 ASSERT!"Popping fiber from empty list"(!empty);
1586                 numLivingFibers--;
1587                 idx++;
1588 
1589                 if( !empty )
1590                     findNextFiber;
1591             }
1592 
1593             @property FiberHandle front() @safe @nogc nothrow {
1594                 auto fib = &theReactor.allFibers[idx.value];
1595                 DBG_ASSERT!"Scanned fiber %s is not alive"(fib.isAlive, idx);
1596 
1597                 return FiberHandle(fib);
1598             }
1599         }
1600 
1601         return FibersIterator(cast(uint) reactorStats.numUsedFibers);
1602     }
1603 
1604     auto iterateScheduledFibers(FiberPriorities priority) nothrow @safe @nogc {
1605         @notrace static struct Range {
1606             private typeof(scheduledFibersNormal.range()) fibersRange;
1607 
1608             @property FiberHandle front() nothrow @nogc {
1609                 return FiberHandle(fibersRange.front);
1610             }
1611 
1612             @property bool empty() const pure nothrow @nogc {
1613                 return fibersRange.empty;
1614             }
1615 
1616             @notrace void popFront() nothrow {
1617                 fibersRange.popFront();
1618             }
1619         }
1620 
1621         with(FiberPriorities) final switch(priority) {
1622         case NORMAL:
1623             return scheduledFibersNormal.range();
1624         case HIGH:
1625             return scheduledFibersHigh.range();
1626         case IMMEDIATE:
1627             return scheduledFibersImmediate.range();
1628         }
1629 
1630         assert(false, "Priority must be member of FiberPriorities");
1631     }
1632 
1633     /**
1634      * Set a fiber name
1635      *
1636      * Used by certain diagnostic functions to distinguish the different fiber types and create histograms.
1637      * Arguments bear no specific meaning, but default to describing the function used to start the fiber.
1638      */
1639     @notrace void setFiberName(FiberHandle fh, string name, void *ptr) nothrow @safe @nogc {
1640         DBG_ASSERT!"Trying to set fiber name of an invalid fiber"(fh.isValid);
1641         setFiberName(fh.get, name, ptr);
1642     }
1643 
1644     /// ditto
1645     @notrace void setFiberName(T)(FiberHandle fh, string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) {
1646         setFiberName(fh, name, dlg.ptr);
1647     }
1648 
1649     /**
1650      * Temporarily change the fiber's name
1651      *
1652      * Meaning of arguments is as for `setFiberName`.
1653      *
1654      * returns:
1655      * A voldemort type whose destructor returns the fiber name to the one it had before. It also has a `release`
1656      * function for returning the name earlier.
1657      */
1658     auto pushFiberName(string name, void *ptr) nothrow @safe @nogc {
1659         static struct PrevName {
1660         private:
1661             string name;
1662             void* ptr;
1663 
1664         public:
1665             @disable this();
1666             @disable this(this);
1667 
1668             this(string name, void* ptr) nothrow @safe @nogc {
1669                 this.name = name;
1670                 this.ptr = ptr;
1671             }
1672 
1673             ~this() nothrow @safe @nogc {
1674                 if( name !is null || ptr !is null )
1675                     release();
1676             }
1677 
1678             void release() nothrow @safe @nogc {
1679                 theReactor.setFiberName( theReactor.thisFiber, this.name, this.ptr );
1680 
1681                 name = null;
1682                 ptr = null;
1683             }
1684         }
1685 
1686         auto prevName = PrevName(name, ptr);
1687         setFiberName( theReactor.thisFiber, name, ptr );
1688 
1689         import std.algorithm: move;
1690         return move(prevName);
1691     }
1692 
1693     /// ditto
1694     @notrace auto pushFiberName(T)(string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) {
1695         return pushFiberName(name, dlg.ptr);
1696     }
1697 
1698     /// Retrieve the fiber name set by `setFiberName`
1699     @notrace string getFiberName(FiberHandle fh) nothrow @safe @nogc {
1700         DBG_ASSERT!"Trying to get fiber name of an invalid fiber"(fh.isValid);
1701         return fh.get.params.fiberName;
1702     }
1703 
1704     /// ditto
1705     @notrace string getFiberName() nothrow @safe @nogc {
1706         return thisFiber.params.fiberName;
1707     }
1708 
1709     /// Retrieve the fiber pointer set by `setFiberName`
1710     @notrace void* getFiberPtr(FiberHandle fh) nothrow @safe @nogc {
1711         DBG_ASSERT!"Trying to get fiber pointer of an invalid fiber"(fh.isValid);
1712         return fh.get.params.fiberPtr;
1713     }
1714 
1715     /// ditto
1716     @notrace void* getFiberPtr() nothrow @safe @nogc {
1717         return thisFiber.params.fiberPtr;
1718     }
1719 
1720     /**
1721      * Wait until given fiber finishes
1722      */
1723     @notrace void joinFiber(FiberHandle fh, Timeout timeout = Timeout.infinite) @safe @nogc {
1724         ReactorFiber* fiber = fh.get();
1725 
1726         if( fiber is null ) {
1727             assertMayContextSwitch();
1728             return;
1729         }
1730 
1731         fiber.params.joinWaiters.wait(timeout);
1732         DBG_ASSERT!"Fiber handle %s is valid after signalling done"(!fh.isValid, fh);
1733     }
1734 
1735     /// Report the current reactor statistics.
1736     @property Stats reactorStats() const nothrow @safe @nogc {
1737         Stats ret = stats;
1738 
1739         foreach( FiberIdx.UnderlyingType i; 0..NUM_SPECIAL_FIBERS ) {
1740             auto fiberIdx = FiberIdx(i);
1741             auto fiber = to!(ReactorFiber*)(fiberIdx);
1742             DBG_ASSERT!"%s is in state %s with histogram of 0"(
1743                     ret.fibersHistogram[fiber.state]>0, fiber.identity, fiber.state );
1744             ret.fibersHistogram[fiber.state]--;
1745         }
1746 
1747         return ret;
1748     }
1749 
1750 private:
1751     package @property inout(ReactorFiber)* thisFiber() inout nothrow pure @safe @nogc {
1752         DBG_ASSERT!"No current fiber as reactor was not started"(isRunning);
1753         return _thisFiber;
1754     }
1755 
1756     @notrace TimedCallback* allocTimedCallback() nothrow @safe @nogc {
1757         DBG_ASSERT!"Registering timer on non-open reactor"(isOpen);
1758         auto ret = timedCallbacksPool.alloc();
1759         ret.generation = timedCallbackGeneration.getNext();
1760 
1761         return ret;
1762     }
1763 
1764     TimedCallback* allocRecurringTimer(Duration interval) nothrow @safe @nogc {
1765         TimedCallback* callback = allocTimedCallback();
1766         if( interval<optionsInEffect.timerGranularity )
1767             interval = optionsInEffect.timerGranularity;
1768 
1769         callback.intervalCycles = TscTimePoint.toCycles(interval);
1770         return callback;
1771     }
1772 
1773     TimedCallback* _registerRecurringTimer(Duration interval) nothrow @safe @nogc {
1774         TimedCallback* callback = allocRecurringTimer(interval);
1775         rescheduleRecurringTimer(callback);
1776         return callback;
1777     }
1778 
1779     @property bool shouldRunTimedCallbacks() nothrow @safe @nogc {
1780         return timeQueue.cyclesTillNextEntry(TscTimePoint.hardNow()) == 0;
1781     }
1782 
1783     void switchToNext() @safe @nogc {
1784         //DEBUG!"SWITCH out of %s"(thisFiber.identity);
1785         assertMayContextSwitch("Context switch");
1786 
1787         stats.numContextSwitches++;
1788 
1789         // in source fiber
1790         {
1791             auto now = TscTimePoint.hardNow;
1792             if( !thisFiber.flag!"SPECIAL" ) {
1793                 auto fiberRunTime =  now - fiberRunStartTime;
1794                 if( fiberRunTime >= optionsInEffect.hoggerWarningThreshold ) {
1795                     WARN!"#HOGGER detected: Fiber %s ran for %sms"(thisFiber.identity, fiberRunTime.total!"msecs");
1796                     // TODO: Add dumping of stack trace
1797                 }
1798             }
1799 
1800             fiberRunStartTime = now;
1801 
1802             if (thisFiber !is mainFiber && !mainFiber.flag!"SCHEDULED" && shouldRunTimedCallbacks()) {
1803                 resumeSpecialFiber(mainFiber);
1804             }
1805 
1806             ReactorFiber* nextFiber;
1807 
1808             if( !scheduledFibersImmediate.empty ) {
1809                 nextFiber = scheduledFibersImmediate.popHead();
1810             } else if(
1811                     !scheduledFibersHigh.empty &&
1812                     ( highPrioritySchedules<HIGH_PRIORITY_SCHEDULES_RATIO || scheduledFibersNormal.empty))
1813             {
1814                 nextFiber = scheduledFibersHigh.popHead();
1815                 highPrioritySchedules++;
1816             } else if( !scheduledFibersNormal.empty ) {
1817                 nextFiber = scheduledFibersNormal.popHead();
1818                 if( !scheduledFibersHigh.empty ) {
1819                     ERROR!"Scheduled normal priority fiber %s over high priority %s to prevent starvation"(
1820                             nextFiber.identity, scheduledFibersHigh.head.identity);
1821                 }
1822 
1823                 highPrioritySchedules = 0;
1824             } else {
1825                 DBG_ASSERT!"Idle fiber scheduled but all queues empty"( !idleFiber.flag!"SCHEDULED" );
1826                 nextFiber = idleFiber;
1827                 idleFiber.flag!"SCHEDULED" = true;
1828                 nothingScheduled = true;
1829             }
1830 
1831             if( thisFiber.state==FiberState.Running )
1832                 thisFiber.state = FiberState.Sleeping;
1833             else {
1834                 assertEQ( thisFiber.state, FiberState.Done, "Fiber is in incorrect state" );
1835                 thisFiber.state = FiberState.None;
1836             }
1837 
1838             DBG_ASSERT!"Couldn't decide on a fiber to schedule"(nextFiber !is null);
1839 
1840             ASSERT!"Next fiber %s is not marked scheduled" (nextFiber.flag!"SCHEDULED", nextFiber.identity);
1841             nextFiber.flag!"SCHEDULED" = false;
1842             DBG_ASSERT!"%s is in state %s, should be Sleeping or Starting"(
1843                     nextFiber.state==FiberState.Sleeping || nextFiber.state==FiberState.Starting,
1844                     nextFiber.identity, nextFiber.state);
1845             nextFiber.state = FiberState.Running;
1846 
1847             // DEBUG!"Switching %s => %s"(thisFiber.identity, nextFiber.identity);
1848             if (thisFiber !is nextFiber) {
1849                 // make the switch
1850                 switchTo(nextFiber);
1851             }
1852         }
1853     }
1854 
1855     void switchTo(ReactorFiber* nextFiber) @safe @nogc {
1856         auto currentFiber = thisFiber;
1857         _thisFiber = nextFiber;
1858 
1859         currentFiber.switchTo(nextFiber);
1860 
1861         // After returning
1862         /+
1863           Important note:
1864           Any code you place here *must* be replicated at the beginning of ReactorFiber.wrapper. Fibers launched
1865           for the first time do not return from `switchTo` above.
1866          +/
1867 
1868         // DEBUG!"SWITCH into %s"(thisFiber.identity);
1869 
1870         // This might throw, so it needs to be the last thing we do
1871         thisFiber.switchInto();
1872     }
1873 
1874     bool fiberTerminated() nothrow @notrace {
1875         ASSERT!"special fibers must never terminate" (!thisFiber.flag!"SPECIAL");
1876 
1877         freeFibers.prepend(thisFiber);
1878 
1879         bool skipBody = false;
1880         try {
1881             // Wait for next incarnation of fiber
1882             switchToNext();
1883         } catch (FiberInterrupt ex) {
1884             INFO!"Fiber %s killed by FiberInterrupt exception %s"(currentFiberId, ex.msg);
1885             skipBody = true;
1886         } catch (Throwable ex) {
1887             ERROR!"switchToNext on %s fiber %s failed with exception %s"(
1888                     thisFiber.state==FiberState.Running ? "just starting" : "dead", currentFiberId, ex.msg);
1889             theReactor.forwardExceptionToMain(ex);
1890             assert(false);
1891         }
1892 
1893         return skipBody;
1894     }
1895 
1896     void resumeSpecialFiber(ReactorFiber* fib) nothrow @safe @nogc {
1897         DBG_ASSERT!"Asked to resume special fiber %s which isn't marked special" (fib.flag!"SPECIAL", fib.identity);
1898         DBG_ASSERT!"Asked to resume special fiber %s with no body set" (fib.flag!"CALLBACK_SET", fib.identity);
1899         DBG_ASSERT!"Special fiber %s scheduled not in head of list" (
1900                 !fib.flag!"SCHEDULED" || scheduledFibersImmediate.head is fib, fib.identity);
1901 
1902         if (!fib.flag!"SCHEDULED") {
1903             fib.flag!"SCHEDULED" = true;
1904             scheduledFibersImmediate.prepend(fib);
1905             nothingScheduled = false;
1906         }
1907     }
1908 
1909     void resumeFiber(ReactorFiber* fib, bool immediate = false) nothrow @safe @nogc {
1910         DBG_ASSERT!"Cannot resume a special fiber %s using the standard resumeFiber" (!fib.flag!"SPECIAL", fib.identity);
1911         ASSERT!"resumeFiber called on %s, which does not have a callback set"(fib.flag!"CALLBACK_SET", fib.identity);
1912 
1913         typeof(scheduledFibersNormal)* queue;
1914         if( immediate ) {
1915             if( fib.flag!"SCHEDULED" && fib !in scheduledFibersImmediate ) {
1916                 fib._owner.remove(fib);
1917                 fib.flag!"SCHEDULED" = false;
1918             }
1919 
1920             queue = &scheduledFibersImmediate;
1921         } else if( fib.flag!"PRIORITY" ) {
1922             queue = &scheduledFibersHigh;
1923         } else {
1924             queue = &scheduledFibersNormal;
1925         }
1926 
1927         if (!fib.flag!"SCHEDULED") {
1928             if (fib._owner !is null) {
1929                 // Whatever this fiber was waiting to do, it is no longer what it needs to be doing
1930                 fib._owner.remove(fib);
1931             }
1932             fib.flag!"SCHEDULED" = true;
1933             queue.append(fib);
1934             nothingScheduled = false;
1935         }
1936     }
1937 
1938     ReactorFiber* _spawnFiber(bool immediate) nothrow @safe @nogc {
1939         ASSERT!"No more free fibers in pool"(!freeFibers.empty);
1940         auto fib = freeFibers.popHead();
1941         assert (!fib.flag!"CALLBACK_SET");
1942         fib.flag!"CALLBACK_SET" = true;
1943         fib.state = FiberState.Starting;
1944         fib._prevId = FiberIdx.invalid;
1945         fib._nextId = FiberIdx.invalid;
1946         fib._owner = null;
1947         fib.params.flsBlock.reset();
1948         resumeFiber(fib, immediate);
1949         return fib;
1950     }
1951 
1952     void idleLoop() {
1953         while (true) {
1954             TscTimePoint start, end;
1955             end = start = TscTimePoint.hardNow;
1956 
1957             while (nothingScheduled) {
1958                 auto critSect = criticalSection();
1959 
1960                 /*
1961                    Since we've updated "end" before calling the timers, these timers won't count as idle time, unless....
1962                    after running them the scheduledFibers list is still empty, in which case they do.
1963                  */
1964                 if( runTimedCallbacks(end) )
1965                     continue;
1966 
1967                 // We only reach here if runTimedCallbacks did nothing, in which case "end" is recent enough
1968                 Duration sleepDuration = timeQueue.timeTillNextEntry(end);
1969                 bool countsAsIdle = true;
1970                 if( actualIdleCallbacks.length==1 ) {
1971                     countsAsIdle = actualIdleCallbacks[0](sleepDuration) && countsAsIdle;
1972                 } else if ( actualIdleCallbacks.length>1 ) {
1973                     foreach(cb; actualIdleCallbacks) {
1974                         with( pushFiberName("Idle callback", cb) ) {
1975                             countsAsIdle = cb(ZERO_DURATION) && countsAsIdle;
1976                         }
1977                     }
1978                 } else {
1979                     //DEBUG!"Idle fiber called with no callbacks, sleeping %sus"(sleepDuration.total!"usecs");
1980                     import core.thread; Thread.sleep(sleepDuration);
1981                 }
1982 
1983                 if( countsAsIdle ) {
1984                     end = TscTimePoint.hardNow;
1985                 } else {
1986                     if( nothingScheduled ) {
1987                         // We are going in for another round, but this round should not count as idle time
1988                         stats.idleCycles += end.diff!"cycles"(start);
1989                         end = start = TscTimePoint.hardNow;
1990                     }
1991                 }
1992             }
1993             stats.idleCycles += end.diff!"cycles"(start);
1994             switchToNext();
1995         }
1996     }
1997 
1998     @notrace bool runTimedCallbacks(TscTimePoint now = TscTimePoint.hardNow) {
1999         // Timer callbacks are not allowed to sleep
2000         auto criticalSectionContainer = criticalSection();
2001 
2002         bool ret;
2003 
2004         TimedCallback* callback;
2005         while ((callback = timeQueue.pop(now)) !is null) {
2006             callback.closure();
2007             if( callback.intervalCycles==0 )
2008                 timedCallbacksPool.release(callback);
2009             else
2010                 rescheduleRecurringTimer(callback);
2011 
2012             ret = true;
2013         }
2014 
2015         return ret;
2016     }
2017 
2018     void rescheduleRecurringTimer(TimedCallback* callback) nothrow @safe @nogc {
2019         ulong cycles = TscTimePoint.hardNow.cycles + callback.intervalCycles;
2020         cycles -= cycles % callback.intervalCycles;
2021         callback.timePoint = TscTimePoint(cycles);
2022 
2023         timeQueue.insert(callback);
2024     }
2025 
2026     ExcBuf* prepThrowInFiber(FiberHandle fHandle, bool updateBT, bool specialOkay = false) nothrow @safe @nogc {
2027         ReactorFiber* fib = fHandle.get();
2028         ASSERT!"Cannot throw in the reactor's own fibers"( !fib.flag!"SPECIAL" || specialOkay );
2029         if( fib is null ) {
2030             WARN!"Failed to throw exception in fiber %s which is no longer valid"(fHandle);
2031             return null;
2032         }
2033 
2034         if( fib.flag!"HAS_EXCEPTION" ) {
2035             ERROR!"Tried to throw exception in fiber %s which already has an exception pending"(fHandle);
2036             return null;
2037         }
2038 
2039         fib.flag!"HAS_EXCEPTION" = true;
2040         fib.flag!"EXCEPTION_BT" = updateBT;
2041         return &fib.params.currExcBuf;
2042     }
2043 
2044     void forwardExceptionToMain(Throwable ex) nothrow @trusted @nogc {
2045         ExcBuf* fiberEx = prepThrowInFiber(FiberHandle(mainFiber), false, true);
2046 
2047         if( fiberEx is null )
2048             return;
2049 
2050         fiberEx.set(ex);
2051         if( !mainFiber.flag!"SPECIAL" ) {
2052             ASSERT!"Main fiber not marked as special but reactor is not stopping"( _stopping );
2053             mainFiber.flag!"SPECIAL" = true;
2054         }
2055         resumeSpecialFiber(mainFiber);
2056         as!"nothrow"(&theReactor.switchToNext);
2057         assert(false, "switchToNext on dead system returned");
2058     }
2059 
2060     void registerHangDetector() @trusted @nogc {
2061         DBG_ASSERT!"registerHangDetector called twice"(!hangDetectorTimer.isSet);
2062         hangDetectorTimer.start();
2063     }
2064 
2065     void deregisterHangDetector() nothrow @trusted @nogc {
2066         hangDetectorTimer.cancel();
2067     }
2068 
2069     extern(C) static void hangDetectorHandler() nothrow @trusted @nogc {
2070         if( !theReactor._hangDetectorEnabled )
2071             return;
2072 
2073         auto now = TscTimePoint.hardNow();
2074         auto delay = now - theReactor.fiberRunStartTime;
2075 
2076         if( delay<theReactor.optionsInEffect.hangDetectorTimeout || theReactor.currentFiberId == IdleFiberId )
2077             return;
2078 
2079         long seconds, usecs;
2080         delay.split!("seconds", "usecs")(seconds, usecs);
2081 
2082         ERROR!"Hang detector triggered for %s after %s.%06s seconds"(theReactor.currentFiberId, seconds, usecs);
2083         dumpStackTrace();
2084 
2085         ABORT("Hang detector killed process");
2086     }
2087 
2088     void registerFaultHandlers() @trusted @nogc {
2089         posix_signal.sigaction_t action;
2090         action.sa_sigaction = &faultHandler;
2091         action.sa_flags = posix_signal.SA_SIGINFO | posix_signal.SA_RESETHAND | posix_signal.SA_ONSTACK;
2092 
2093         errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGSEGV, &action, null)==0, "Failed to register SIGSEGV handler" );
2094         errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGILL, &action, null)==0, "Failed to register SIGILL handler" );
2095         errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGBUS, &action, null)==0, "Failed to register SIGBUS handler" );
2096     }
2097 
2098     void deregisterFaultHandlers() nothrow @trusted @nogc {
2099         posix_signal.signal(OSSignal.SIGBUS, posix_signal.SIG_DFL);
2100         posix_signal.signal(OSSignal.SIGILL, posix_signal.SIG_DFL);
2101         posix_signal.signal(OSSignal.SIGSEGV, posix_signal.SIG_DFL);
2102     }
2103 
2104     @notrace extern(C) static void faultHandler(int signum, siginfo_t* info, void* ctx) nothrow @trusted @nogc {
2105         OSSignal sig = cast(OSSignal)signum;
2106         string faultName;
2107 
2108         switch(sig) {
2109         case OSSignal.SIGSEGV:
2110             faultName = "Segmentation fault";
2111             break;
2112         case OSSignal.SIGILL:
2113             faultName = "Illegal instruction";
2114             break;
2115         case OSSignal.SIGBUS:
2116             faultName = "Bus error";
2117             break;
2118         default:
2119             faultName = "Unknown fault";
2120             break;
2121         }
2122 
2123         META!"#OSSIGNAL %s"(faultName);
2124         dumpStackTrace();
2125         flushLog(); // There is a certain chance the following lines themselves fault. Flush the logs now so that we have something
2126 
2127         Ucontext* contextPtr = cast(Ucontext*)ctx;
2128         auto pc = contextPtr ? contextPtr.uc_mcontext.registers.rip : 0;
2129 
2130         if( isReactorThread ) {
2131             auto currentSD = theReactor.getCurrentFiberPtr(true).stackDescriptor;
2132             ERROR!"%s on %s address 0x%x, PC 0x%x stack params at 0x%x"(
2133                     faultName, theReactor.currentFiberId, info.si_addr, pc, theReactor.getCurrentFiberPtr(true).params);
2134             ERROR!"Stack is at [%s .. %s]"( currentSD.bstack, currentSD.tstack );
2135             auto guardAddrStart = currentSD.bstack - GUARD_ZONE_SIZE;
2136             if( info.si_addr < currentSD.bstack && info.si_addr >= guardAddrStart ) {
2137                 ERROR!"Hit stack guard area"();
2138             }
2139         } else {
2140             ERROR!"%s on OS thread at address %s, PC %s"(faultName, info.si_addr, pc);
2141         }
2142         flushLog();
2143 
2144         // Exit the fault handler, which will re-execute the offending instruction. Since we registered as run once, the default handler
2145         // will then kill the node.
2146     }
2147 
2148     void mainloop() {
2149         assert (isOpen);
2150         assert (!isRunning);
2151         assert (_thisFiber is null);
2152 
2153         _running = true;
2154         scope(exit) _running = false;
2155 
2156         GC.disable();
2157         scope(exit) GC.enable();
2158 
2159         lastGCStats = GC.stats();
2160 
2161         if( optionsInEffect.utGcDisabled ) {
2162             // GC is disabled during the reactor run. Run it before we start
2163             GC.collect();
2164         }
2165 
2166         // Don't register the hang detector until after we've finished running the GC
2167         if( optionsInEffect.hangDetectorTimeout !is Duration.zero ) {
2168             registerHangDetector();
2169             _hangDetectorEnabled = true;
2170         } else {
2171             _hangDetectorEnabled = false;
2172         }
2173 
2174         scope(exit) {
2175             if( optionsInEffect.hangDetectorTimeout !is Duration.zero )
2176                 deregisterHangDetector();
2177 
2178             _hangDetectorEnabled = false;
2179         }
2180 
2181         _thisFiber = mainFiber;
2182         scope(exit) _thisFiber = null;
2183 
2184         if( !optionsInEffect.utGcDisabled )
2185             TimerHandle gcTimer = registerRecurringTimer!requestGCCollectionInternal(optionsInEffect.gcInterval, false);
2186 
2187         try {
2188             while (!_stopping) {
2189                 DBG_ASSERT!"Switched to mainloop with wrong thisFiber %s"(thisFiber is mainFiber, thisFiber.identity);
2190                 runTimedCallbacks();
2191                 if( _gcCollectionNeeded )
2192                     gcCollect();
2193 
2194                 if( !_stopping )
2195                     switchToNext();
2196             }
2197         } catch( ReactorExit ex ) {
2198             ASSERT!"Main loop threw ReactorExit, but reactor is not stopping"(_stopping);
2199         }
2200 
2201         performStopReactor();
2202     }
2203 
2204     void gcCollect() {
2205         _gcCollectionNeeded = false;
2206         auto statsBefore = GC.stats();
2207 
2208         if(
2209             _gcCollectionForce ||
2210             optionsInEffect.gcRunThreshold==0 ||
2211             statsBefore.usedSize > lastGCStats.usedSize+optionsInEffect.gcRunThreshold )
2212         {
2213             TscTimePoint.hardNow(); // Update the hard now value
2214             DEBUG!"#GC collection cycle started, %s bytes allocated since last run (forced %s)"(statsBefore.usedSize - lastGCStats.usedSize, _gcCollectionForce);
2215 
2216             GC.collect();
2217             TscTimePoint.hardNow(); // Update the hard now value
2218             lastGCStats = GC.stats();
2219             DEBUG!"#GC collection cycle ended, freed %s bytes"(statsBefore.usedSize - lastGCStats.usedSize);
2220 
2221             _gcCollectionForce = false;
2222         }
2223     }
2224 
2225     void performStopReactor() @nogc {
2226         ASSERT!"performStopReactor must be called from the main fiber. Use Reactor.stop instead"( isMain );
2227 
2228         Throwable reactorExit = mkEx!ReactorExit("Reactor is quitting");
2229         foreach(ref fiber; allFibers[1..$]) { // All fibers but main
2230             if( fiber.isAlive ) {
2231                 fiber.flag!"SPECIAL" = false;
2232                 throwInFiber(FiberHandle(&fiber), reactorExit);
2233             }
2234         }
2235 
2236         thisFiber.flag!"SPECIAL" = false;
2237         yield();
2238 
2239         META!"Stopping reactor"();
2240         _stopping = false;
2241     }
2242 
2243     @notrace void setFiberName(ReactorFiber* fib, string name, void *ptr) nothrow @safe @nogc {
2244         fib.params.fiberName = name;
2245         fib.params.fiberPtr = ptr;
2246     }
2247 
2248     @notrace void setFiberName(T)(ReactorFiber* fib, string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) {
2249         fib.params.fiberName = name;
2250         fib.params.fiberPtr = dlg.ptr;
2251     }
2252 
2253     void performCrossFiberHook() @safe @nogc {
2254         ReactorFiber* callingFiber;
2255 
2256         // Perform the hook under critical section
2257         with(criticalSection()) {
2258             scope(failure) ASSERT!"Cross fiber hook threw"(false);
2259             crossFiberHook();
2260 
2261             callingFiber = crossFiberHookCaller.get();
2262             ASSERT!"Fiber invoking hook %s invalidated by hook"(callingFiber !is null, crossFiberHookCaller.identity);
2263 
2264             // Clear the hooks so they don't get called when returning to the hooker fiber
2265             crossFiberHook = null;
2266             crossFiberHookCaller.reset();
2267         }
2268 
2269         switchTo(callingFiber);
2270     }
2271 
2272     @notrace void callInFiber(FiberHandle fh, scope void delegate() nothrow @safe @nogc callback) @trusted @nogc {
2273         ASSERT!"Cannot set hook when one is already set"( crossFiberHook is null && !crossFiberHookCaller.isSet );
2274         ReactorFiber* fib = fh.get();
2275         if( fib is null )
2276             // Fiber isn't valid - don't do anything
2277             return;
2278 
2279         if( fib is thisFiber ) {
2280             // We were asked to switch to ourselves. Just run the callback
2281             auto critSect = criticalSection();
2282             callback();
2283 
2284             return;
2285         }
2286 
2287         with(FiberState) {
2288             ASSERT!"Trying to dump stack trace of %s which is in invalid state %s"(
2289                 fib.state==Starting || fib.state==Sleeping, fib.identity, fib.state );
2290         }
2291 
2292         // We are storing a scoped delegate inside a long living pointer, but we make sure to finish using it before exiting.
2293         crossFiberHook = callback;
2294         crossFiberHookCaller = FiberHandle(thisFiber); // Don't call currentFiber, as we might be a special fiber
2295 
2296         switchTo(fib);
2297 
2298         DBG_ASSERT!"crossFiberHookCaller not cleared after call"( !crossFiberHookCaller.isSet );
2299         DBG_ASSERT!"crossFiberHook not cleared after call"( crossFiberHook is null );
2300     }
2301 
2302     import std..string : format;
2303     enum string decl_log_as(string logLevel) = q{
2304         @notrace public void %1$s_AS(
2305             string fmt, string file = __FILE_FULL_PATH__, string mod = __MODULE__, int line = __LINE__, T...)
2306             (FiberHandle fh, T args) nothrow @safe @nogc
2307         {
2308             auto fiber = fh.get;
2309             if( fiber is null ) {
2310                 ERROR!("Can't issue %1$s log as %%s. Original log: "~fmt, file, mod, line)(fh, args);
2311                 return;
2312             }
2313 
2314             auto currentFiber = getCurrentFiberPtr(true);
2315             fiber.logSwitchInto();
2316             scope(exit) currentFiber.logSwitchInto();
2317 
2318             %1$s!(fmt, file, mod, line)(args);
2319         }
2320     }.format(logLevel);
2321     mixin(decl_log_as!"DEBUG");
2322     mixin(decl_log_as!"INFO");
2323     mixin(decl_log_as!"WARN");
2324     mixin(decl_log_as!"ERROR");
2325     mixin(decl_log_as!"META");
2326 
2327     /**
2328      * Log the stack trace of a given fiber.
2329      *
2330      * The fiber should be in a valid state. This is the equivalent to the fiber itself running `dumpStackTrace`.
2331      */
2332     @notrace public void LOG_TRACEBACK_AS(
2333             FiberHandle fh, string text, string file = __FILE_FULL_PATH__, size_t line = __LINE__) @safe @nogc
2334     {
2335         callInFiber(fh, {
2336                 dumpStackTrace(text, file, line);
2337             });
2338     }
2339 }
2340 
2341 // Expose the conversion to/from ReactorFiber only to the reactor package
2342 package ReactorFiber* to(T : ReactorFiber*)(FiberIdx fidx) nothrow @safe @nogc {
2343     if (!fidx.isValid)
2344         return null;
2345 
2346     ASSERT!"Reactor is not open"( theReactor.isOpen );
2347     return &theReactor.allFibers[fidx.value];
2348 }
2349 
2350 package FiberIdx to(T : FiberIdx)(const ReactorFiber* rfp) nothrow @safe @nogc {
2351     if (rfp is null)
2352         return FiberIdx.invalid;
2353 
2354     ASSERT!"Reactor is not open"( theReactor.isOpen );
2355     auto idx = rfp - &theReactor.allFibers.arr[0];
2356     DBG_ASSERT!"Reactor fiber pointer not pointing to fibers pool: base %s ptr %s idx %s"(
2357             idx>=0 && idx<theReactor.allFibers.arr.length, &theReactor.allFibers.arr[0], rfp, idx);
2358     return FiberIdx( cast(ushort)idx );
2359 }
2360 
2361 package FiberIdx to(T : FiberIdx)(FiberId fiberId) nothrow @safe @nogc {
2362     return FiberIdx( fiberId.value & theReactor.maxNumFibersMask );
2363 }
2364 
2365 private __gshared Reactor _theReactor;
2366 private bool /* thread local */ _isReactorThread;
2367 
2368 /**
2369  * return a reference to the Reactor singleton
2370  *
2371  * In theory, @safe code must not access global variables. Since theReactor is only meant to be used by a single thread, however, this
2372  * function is @trusted. If it were not, practically no code could be @safe.
2373  */
2374 @property ref Reactor theReactor() nothrow @trusted @nogc {
2375     //DBG_ASSERT!"not main thread"(_isReactorThread);
2376     return _theReactor;
2377 }
2378 
2379 /// Returns whether the current thread is the thread in which theReactor is running
2380 @property bool isReactorThread() nothrow @safe @nogc {
2381     return _isReactorThread;
2382 }
2383 
2384 version (unittest) {
2385     /**
2386      * Run a test inside a reactor
2387      *
2388      * This is a convenience function for running a UT as a reactor fiber. A new reactor will be initialized, `dg` called
2389      * and the reactor will automatically stop when `dg` is done.
2390      */
2391     int testWithReactor(int delegate() dg, Reactor.OpenOptions options = Reactor.OpenOptions.init) {
2392         sigset_t emptyMask;
2393         errnoEnforceNGC( sigprocmask( SIG_SETMASK, &emptyMask, null )==0, "sigprocmask failed" );
2394 
2395         theReactor.setup(options);
2396         scope(success) theReactor.teardown();
2397 
2398         bool delegateReturned = false;
2399 
2400         void wrapper() {
2401             int ret;
2402             try {
2403                 ret = dg();
2404 
2405                 delegateReturned = true;
2406             } catch(ReactorExit ex) {
2407                 LOG_EXCEPTION(ex);
2408                 assert(false, "testWithReactor's body called theReactor.stop explicitly");
2409             } catch(FiberInterrupt ex) {
2410                 LOG_EXCEPTION(ex);
2411                 theReactor.stop();
2412             } catch(Throwable ex) {
2413                 // No need to stop the reactor - the exception thrown will teminate it
2414                 LOG_EXCEPTION(ex);
2415                 ERROR!"Test terminated abnormally"();
2416                 throw ex;
2417             }
2418 
2419             theReactor.stop( ret );
2420         }
2421 
2422         theReactor.spawnFiber(&wrapper);
2423         int ret = theReactor.start();
2424         assert (delegateReturned, "testWithReactor called with a delegate that threw without returning");
2425 
2426         return ret;
2427     }
2428 
2429     /// ditto
2430     void testWithReactor(void delegate() dg, Reactor.OpenOptions options = Reactor.OpenOptions.init) {
2431         int wrapper() {
2432             dg();
2433 
2434             return 0;
2435         }
2436 
2437         testWithReactor(&wrapper, options);
2438     }
2439 
2440     public import mecca.runtime.ut: mecca_ut;
2441 
2442     mixin template TEST_FIXTURE_REACTOR(FIXTURE) {
2443         import mecca.runtime.ut: runFixtureTestCases;
2444         import mecca.reactor: testWithReactor, Reactor;
2445         unittest {
2446             Reactor.OpenOptions options;
2447 
2448             static if( __traits(hasMember, FIXTURE, "reactorOptions") ) {
2449                 options = FIXTURE.reactorOptions;
2450             }
2451 
2452             testWithReactor({
2453                     try {
2454                         runFixtureTestCases!(FIXTURE)();
2455                     } catch( Throwable ex ) {
2456                         import mecca.log: LOG_EXCEPTION;
2457                         import mecca.lib.exception: DIE;
2458 
2459                         LOG_EXCEPTION(ex);
2460                         DIE("UT failed due to exception");
2461                     }
2462                 }, options);
2463         }
2464     }
2465 }
2466 
2467 
2468 unittest {
2469     import std.stdio;
2470 
2471     theReactor.setup();
2472     scope(exit) theReactor.teardown();
2473 
2474     static void fibFunc(string name) {
2475         foreach(i; 0 .. 10) {
2476             writeln(name);
2477             theReactor.yield();
2478         }
2479         theReactor.stop();
2480     }
2481 
2482     theReactor.spawnFiber(&fibFunc, "hello");
2483     theReactor.spawnFiber(&fibFunc, "world");
2484     theReactor.start();
2485 }
2486 
2487 unittest {
2488     // Test simple timeout
2489     import std.stdio;
2490 
2491     theReactor.setup();
2492     scope(exit) theReactor.teardown();
2493 
2494     uint counter;
2495     TscTimePoint start;
2496 
2497     void fiberFunc(Duration duration) {
2498         INFO!"Fiber %s sleeping for %s"(theReactor.currentFiberHandle, duration.toString);
2499         theReactor.sleep(duration);
2500         auto now = TscTimePoint.hardNow;
2501         counter++;
2502         INFO!"Fiber %s woke up after %s, overshooting by %s counter is %s"(theReactor.currentFiberHandle, (now - start).toString,
2503                 ((now-start) - duration).toString, counter);
2504     }
2505 
2506     void ender() {
2507         INFO!"Fiber %s ender is sleeping for 250ms"(theReactor.currentFiberHandle);
2508         theReactor.sleep(dur!"msecs"(250));
2509         INFO!"Fiber %s ender woke up"(theReactor.currentFiberHandle);
2510 
2511         theReactor.stop();
2512     }
2513 
2514     theReactor.spawnFiber(&fiberFunc, dur!"msecs"(10));
2515     theReactor.spawnFiber(&fiberFunc, dur!"msecs"(100));
2516     theReactor.spawnFiber(&fiberFunc, dur!"msecs"(150));
2517     theReactor.spawnFiber(&fiberFunc, dur!"msecs"(20));
2518     theReactor.spawnFiber(&fiberFunc, dur!"msecs"(30));
2519     theReactor.spawnFiber(&fiberFunc, dur!"msecs"(200));
2520     theReactor.spawnFiber(&ender);
2521 
2522     start = TscTimePoint.hardNow;
2523     theReactor.start();
2524     auto end = TscTimePoint.hardNow;
2525     INFO!"UT finished in %s"((end - start).toString);
2526 
2527     assert(counter == 6, "Not all fibers finished");
2528 }
2529 
2530 unittest {
2531     // Test suspending timeout
2532     import std.stdio;
2533 
2534     theReactor.setup();
2535     scope(exit) theReactor.teardown();
2536 
2537     void fiberFunc() {
2538         bool thrown;
2539 
2540         try {
2541             theReactor.suspendCurrentFiber( Timeout(dur!"msecs"(4)) );
2542         } catch(TimeoutExpired ex) {
2543             thrown = true;
2544         }
2545 
2546         assert(thrown);
2547 
2548         theReactor.stop();
2549     }
2550 
2551     theReactor.spawnFiber(&fiberFunc);
2552     theReactor.start();
2553 }
2554 
2555 unittest {
2556     // Test suspending timeout
2557     import std.stdio;
2558 
2559     // GC running during the test mess with the timing
2560     Reactor.OpenOptions oo;
2561     oo.utGcDisabled = true;
2562 
2563     theReactor.setup(oo);
2564     scope(exit) theReactor.teardown();
2565 
2566     void fiberFunc() {
2567         TimerHandle[8] handles;
2568         Duration[8] timeouts = [
2569             dur!"msecs"(2),
2570             dur!"msecs"(200),
2571             dur!"msecs"(6),
2572             dur!"msecs"(120),
2573             dur!"msecs"(37),
2574             dur!"msecs"(40),
2575             dur!"msecs"(133),
2576             dur!"msecs"(8),
2577         ];
2578 
2579         ubyte a;
2580 
2581         static void timer(ubyte* a, TimerHandle* handle, ubyte bit) {
2582             (*a) |= 1<<bit;
2583 
2584             (*handle) = TimerHandle.init;
2585         }
2586 
2587         foreach(ubyte i, duration; timeouts) {
2588             handles[i] = theReactor.registerTimer!timer( Timeout(duration), &a, &handles[i], i );
2589         }
2590 
2591         uint recurringCounter;
2592         static void recurringTimer(uint* counter) {
2593             (*counter)++;
2594         }
2595 
2596         TimerHandle recurringTimerHandle = theReactor.registerRecurringTimer!recurringTimer( dur!"msecs"(7), &recurringCounter );
2597 
2598         theReactor.sleep(dur!"msecs"(3));
2599 
2600         // Cancel one expired timeout and one yet to happen
2601         handles[0].cancelTimer();
2602         handles[6].cancelTimer();
2603 
2604         // Wait for all timers to run
2605         theReactor.sleep(dur!"msecs"(200));
2606 
2607         assert(a == 0b1011_1111);
2608         ASSERT!"Recurring timer should run 29 times, ran %s"(recurringCounter>=25 && recurringCounter<=30, recurringCounter); // 203ms / 7
2609 
2610         theReactor.stop();
2611     }
2612 
2613     theReactor.spawnFiber(&fiberFunc);
2614     theReactor.start();
2615 }
2616 
2617 unittest {
2618     import mecca.reactor.sync.event;
2619 
2620     // Linux has a fiber running for the signal handler, Darwin does not.
2621     version (Darwin)
2622         enum startupFiberCount = 0;
2623     else version (linux)
2624         enum startupFiberCount = 1;
2625 
2626     theReactor.setup();
2627     scope(exit) theReactor.teardown();
2628 
2629     Event evt1, evt2;
2630 
2631     class TheException : Exception {
2632         this() {
2633             super("The Exception");
2634         }
2635     }
2636 
2637     void fib2() {
2638         // Release 1
2639         evt1.set();
2640 
2641         try {
2642             // Wait for 1 to do its stuff
2643             evt2.wait();
2644 
2645             assert( false, "Exception not thrown" );
2646         } catch( Exception ex ) {
2647             assert( ex.msg == "The Exception" );
2648         }
2649 
2650         theReactor.stop();
2651     }
2652 
2653     void fib1() {
2654         assertEQ( theReactor.getFiberState(theReactor.currentFiberHandle), FiberState.Running );
2655         assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 0 );
2656         auto fib = theReactor.spawnFiber(&fib2);
2657         assertEQ( theReactor.getFiberState(fib), FiberState.Starting );
2658         assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 1 );
2659 
2660         evt1.wait();
2661 
2662         assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 0 );
2663 
2664         assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Sleeping], startupFiberCount + 1 );
2665         theReactor.throwInFiber(fib, new TheException);
2666         // The following should be "1", because the state would switch to Scheduled. Since that's not implemented yet...
2667         assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Sleeping], startupFiberCount + 1 ); // Should be 1
2668         assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Scheduled], 0 ); // Should 1
2669         evt2.set();
2670     }
2671 
2672     theReactor.spawnFiber(&fib1);
2673     theReactor.start();
2674 }
2675 
2676 unittest {
2677     Reactor.OpenOptions options;
2678     options.hangDetectorTimeout = 20.msecs;
2679     options.utGcDisabled = true;
2680     DEBUG!"sanity: %s"(options.hangDetectorTimeout.toString);
2681 
2682     testWithReactor({
2683             theReactor.sleep(200.msecs);
2684             /+
2685             // To trigger the hang, uncomment this:
2686             import core.thread;
2687             Thread.sleep(200.msecs);
2688             +/
2689             }, options);
2690 }
2691 
2692 /+
2693 unittest {
2694     // Trigger a segmentation fault
2695     theReactor.options.hangDetectorTimeout = 20.msecs;
2696     DEBUG!"sanity: %s"(theReactor.options.hangDetectorTimeout);
2697 
2698     testWithReactor({
2699             int* a = cast(int*) 16;
2700             *a = 3;
2701             });
2702 }
2703 +/
2704 
2705 unittest {
2706     // No automatic failing, but at least exercise the *_AS loggers
2707     import mecca.reactor.sync.event;
2708     Event finish;
2709 
2710     META!"#UT exercise log_AS functions"();
2711 
2712     void fiberFunc() {
2713         DEBUG!"Fiber started"();
2714         finish.wait();
2715         DEBUG!"Fiber finished"();
2716     }
2717 
2718     void testBody() {
2719         auto fh = theReactor.spawnFiber(&fiberFunc);
2720         theReactor.yield();
2721 
2722         DEBUG!"Main fiber logging as %s"(fh);
2723         theReactor.DEBUG_AS!"DEBUG trace with argument %s"(fh, 17);
2724         theReactor.INFO_AS!"INFO trace"(fh);
2725         theReactor.WARN_AS!"WARN trace"(fh);
2726         theReactor.ERROR_AS!"ERROR trace"(fh);
2727         theReactor.META_AS!"META trace"(fh);
2728 
2729         DEBUG!"Killing fiber"();
2730         finish.set();
2731         theReactor.yield();
2732 
2733         DEBUG!"Trying to log as dead fiber"();
2734         theReactor.DEBUG_AS!"DEBUG trace on dead fiber with argument %s"(fh, 18);
2735     }
2736 
2737     testWithReactor(&testBody);
2738 }
2739 
2740 unittest {
2741     // Make sure we do not immediately repeat the same FiberId on relaunch
2742     FiberId fib;
2743 
2744     void test1() {
2745         fib = theReactor.currentFiberId();
2746     }
2747 
2748     void test2() {
2749         assert(theReactor.currentFiberId() != fib);
2750     }
2751 
2752     testWithReactor({
2753             theReactor.spawnFiber(&test1);
2754             theReactor.yield();
2755             theReactor.spawnFiber(&test2);
2756             theReactor.yield();
2757         });
2758 }
2759 
2760 unittest {
2761     // Make sure that FiberHandle can return a ReactorFiber*
2762     static void fiberBody() {
2763         assert( theReactor.currentFiberPtr == theReactor.currentFiberHandle.get() );
2764     }
2765 
2766     testWithReactor({
2767             theReactor.spawnFiber!fiberBody(); // Run twice to make sure the genration isn't 0
2768             theReactor.yield();
2769             theReactor.spawnFiber!fiberBody();
2770             theReactor.yield();
2771         });
2772 }
2773 
2774 unittest {
2775     int ret = testWithReactor( { return 17; } );
2776 
2777     assert( ret==17 );
2778 }
2779 
2780 unittest {
2781     // test priority of scheduled fibers
2782 
2783     uint gen;
2784     string[] runOrder;
2785 
2786     void verify(bool Priority)(string id) {
2787 
2788         static if( Priority ) {
2789             auto priorityWatcher = theReactor.boostFiberPriority();
2790         }
2791         theReactor.suspendCurrentFiber();
2792         runOrder ~= id;
2793     }
2794 
2795     testWithReactor({
2796             FiberHandle[] fh;
2797             fh ~= theReactor.spawnFiber(&verify!false, "reg1");
2798             fh ~= theReactor.spawnFiber(&verify!false, "reg2");
2799             fh ~= theReactor.spawnFiber(&verify!false, "imm1"); // Scheduled immediate
2800             fh ~= theReactor.spawnFiber(&verify!true, "pri1");
2801             fh ~= theReactor.spawnFiber(&verify!false, "imm2"); // Scheduled immediate
2802             fh ~= theReactor.spawnFiber(&verify!true, "pri2");
2803             fh ~= theReactor.spawnFiber(&verify!false, "reg3");
2804             fh ~= theReactor.spawnFiber(&verify!true, "pri3"); // reg1 bypasses this one in order to avoid starvation
2805             fh ~= theReactor.spawnFiber(&verify!false, "imm3"); // Scheduled immediate
2806             theReactor.yield();
2807             foreach(i; 0..fh.length)
2808                 theReactor.resumeFiber( fh[i], i==2 || i==4 );
2809 
2810             // Resuming immediate an already scheduled fiber should change its priority
2811             theReactor.resumeFiber( fh[8], true );
2812 
2813             theReactor.yield();
2814             assertEQ(["imm1", "imm2", "imm3", "pri1", "pri2", "reg1", "pri3", "reg2", "reg3"], runOrder);
2815         });
2816 }
2817 
2818 unittest {
2819     // Test join
2820 
2821     static void fiberBody() {
2822         theReactor.yield();
2823         theReactor.yield();
2824         theReactor.yield();
2825     }
2826 
2827     testWithReactor({
2828             FiberHandle fh = theReactor.spawnFiber!fiberBody();
2829             assertEQ( theReactor.getFiberState(fh), FiberState.Starting );
2830             theReactor.yield();
2831             assertEQ( theReactor.getFiberState(fh), FiberState.Scheduled );
2832             theReactor.joinFiber(fh);
2833             assertEQ( theReactor.getFiberState(fh), FiberState.None );
2834             theReactor.joinFiber(fh, Timeout(Duration.zero));
2835         });
2836 }
2837 
2838 unittest {
2839     testWithReactor({
2840             void doNothing() {
2841                 while( true )
2842                     theReactor.sleep(1.msecs);
2843             }
2844 
2845             FiberHandle[] handles;
2846             handles ~= theReactor.spawnFiber(&doNothing);
2847             handles ~= theReactor.spawnFiber(&doNothing);
2848             theReactor.sleep(5.msecs);
2849 
2850             foreach(fh; handles) {
2851                 theReactor.throwInFiber!FiberKilled(fh);
2852             }
2853 
2854             DEBUG!"Sleeping for 1ms"();
2855             theReactor.sleep(1.msecs);
2856             DEBUG!"Woke up from sleep"();
2857         });
2858 }
2859 
2860 unittest {
2861     testWithReactor({
2862             static void fiber() {
2863                 theReactor.sleep(1.seconds);
2864             }
2865 
2866             auto fh = theReactor.spawnFiber!fiber();
2867 
2868             DEBUG!"Starting test"();
2869             foreach(i; 0..4) {
2870                 theReactor.LOG_TRACEBACK_AS(fh, "test");
2871                 DEBUG!"Back in fiber"();
2872                 theReactor.yield();
2873             }
2874             DEBUG!"Test ended"();
2875         });
2876 }