1 /// Define the micro-threading reactor
2 module mecca.reactor;
3
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5
6 static import posix_signal = core.sys.posix.signal;
7 static import posix_time = core.sys.posix.time;
8 import core.memory: GC;
9 import core.sys.posix.signal;
10 import core.sys.posix.sys.mman: munmap, mprotect, PROT_NONE;
11 import core.thread: thread_isMainThread;
12
13 import std.exception;
14 import std..string;
15 import std.traits;
16
17 import mecca.containers.arrays;
18 import mecca.containers.lists;
19 import mecca.containers.pools;
20 import mecca.lib.concurrency;
21 import mecca.lib.consts;
22 import mecca.lib.exception;
23 import mecca.lib.integers : bitsInValue, createBitMask;
24 import mecca.lib.memory;
25 import mecca.lib.reflection;
26 import mecca.lib.time;
27 import mecca.lib.time_queue;
28 import mecca.lib.typedid;
29 import mecca.log;
30 import mecca.log.impl;
31 import mecca.platform.os;
32 import mecca.reactor.fiber_group;
33 import mecca.reactor.fls;
34 import mecca.reactor.impl.fibril: Fibril;
35 import mecca.reactor.subsystems.threading;
36 import mecca.reactor.sync.event: Signal;
37 public import mecca.reactor.types;
38
39 import std.stdio;
40
41 /// Handle for manipulating registered timers.
42 alias TimerHandle = Reactor.TimerHandle;
43 alias FiberIncarnation = ushort;
44
45 // The slot number in the stacks for the fiber
46 private alias FiberIdx = AlgebraicTypedIdentifier!("FiberIdx", ushort, ushort.max, ushort.max);
47
48 /// Track the fiber's state
49 enum FiberState : ubyte {
50 None, /// Fiber isn't running
51 Starting, /// Fiber was spawned, but have not yet started running
52 Scheduled, /// Fiber is waiting to run
53 Running, /// Fiber is running
54 Sleeping, /// Fiber is currently suspended
55 Done, /// Fiber has finished running
56 }
57
58 private {
59 extern(C) void* _d_eh_swapContext(void* newContext) nothrow @nogc;
60 extern(C) void* _d_eh_swapContextDwarf(void* newContext) nothrow @nogc;
61 }
62
63 struct ReactorFiber {
64 // Prevent accidental copying
65 @disable this(this);
66
67 static struct OnStackParams {
68 Closure fiberBody;
69 union {
70 DRuntimeStackDescriptor _stackDescriptor;
71 DRuntimeStackDescriptor* stackDescriptorPtr;
72 }
73 FiberGroup.Chain fgChain;
74 FLSArea flsBlock;
75 ExcBuf currExcBuf;
76 LogsFiberSavedContext logsSavedContext;
77 string fiberName; // String identifying what this fiber is doing
78 void* fiberPtr; // The same with a numerical value
79 Signal joinWaiters;
80 }
81 enum Flags: ubyte {
82 // XXX Do we need CALLBACK_SET?
83 CALLBACK_SET = 0x01, /// Has callback set
84 SPECIAL = 0x02, /// Special fiber. Not a normal user fiber
85 MAIN = 0x04, /// This is the main fiber (i.e. - the "not a fiber")
86 SCHEDULED = 0x08, /// Fiber currently scheduled to be run
87 SLEEPING = 0x10, /// Fiber is sleeping on a sync object
88 HAS_EXCEPTION = 0x20, /// Fiber has pending exception to be thrown in it
89 EXCEPTION_BT = 0x40, /// Fiber exception needs to have fiber's backtrace
90 PRIORITY = 0x80, /// Fiber is a high priority one
91 }
92
93 align(1):
94 Fibril fibril;
95 OnStackParams* params;
96 LinkedListWithOwner!(ReactorFiber*)* _owner;
97 FiberIdx _nextId;
98 FiberIdx _prevId;
99 FiberIncarnation incarnationCounter;
100 ubyte _flags;
101 FiberState _state;
102 static extern(C) void* function(void*) @nogc nothrow _swapEhContext = &swapEhContextChooser;
103
104 // We define this struct align(1) for the sole purpose of making the following static assert verify what it's supposed to
105 static assert (this.sizeof == 32); // keep it small and cache-line friendly
106
107 // LinkedQueue access through property
108 @property ReactorFiber* _next() const nothrow @safe @nogc {
109 return to!(ReactorFiber*)(_nextId);
110 }
111
112 @property void _next(FiberIdx newNext) nothrow @safe @nogc {
113 _nextId = newNext;
114 }
115
116 @property void _next(ReactorFiber* newNext) nothrow @safe @nogc {
117 _nextId = to!FiberIdx(newNext);
118 }
119
120 @property ReactorFiber* _prev() const nothrow @safe @nogc {
121 return to!(ReactorFiber*)(_prevId);
122 }
123
124 @property void _prev(FiberIdx newPrev) nothrow @safe @nogc {
125 _prevId = newPrev;
126 }
127
128 @property void _prev(ReactorFiber* newPrev) nothrow @safe @nogc {
129 _prevId = to!FiberIdx(newPrev);
130 }
131
132 @notrace void setup(void[] stackArea, bool main) nothrow @nogc {
133 _flags = 0;
134
135 if( !main )
136 fibril.set(stackArea[0 .. $ - OnStackParams.sizeof], &wrapper);
137 else
138 flag!"MAIN" = true;
139
140 params = cast(OnStackParams*)&stackArea[$ - OnStackParams.sizeof];
141 setToInit(params);
142
143 if( !main ) {
144 params._stackDescriptor.bstack = stackArea.ptr + stackArea.length; // Include params, as FLS is stored there
145 params._stackDescriptor.tstack = fibril.rsp;
146 params._stackDescriptor.add();
147 } else {
148 import core.thread: Thread;
149 params.stackDescriptorPtr = cast(DRuntimeStackDescriptor*)accessMember!("m_curr")(Thread.getThis());
150 DBG_ASSERT!"MAIN not set on main fiber"( flag!"MAIN" );
151 }
152
153 _next = null;
154 incarnationCounter = 0;
155 }
156
157 @notrace void teardown() nothrow @nogc {
158 fibril.reset();
159 if (!flag!"MAIN") {
160 params._stackDescriptor.remove();
161 }
162 params = null;
163 }
164
165 @notrace void switchTo(ReactorFiber* next) nothrow @trusted @nogc {
166 pragma(inline, true);
167 import core.thread: Thread;
168
169 DRuntimeStackDescriptor* currentSD = stackDescriptor;
170 DRuntimeStackDescriptor* nextSD = next.stackDescriptor;
171
172 currentSD.ehContext = _swapEhContext(nextSD.ehContext);
173
174 // Since druntime does not expose the interfaces needed for switching fibers, we need to hack around the
175 // protection system to access Thread.m_curr, which is private.
176 DRuntimeStackDescriptor** threadCurrentSD = cast(DRuntimeStackDescriptor**)&accessMember!("m_curr")(Thread.getThis());
177 *threadCurrentSD = nextSD;
178
179 fibril.switchTo(next.fibril, ¤tSD.tstack);
180 }
181
182 @notrace @property private DRuntimeStackDescriptor* stackDescriptor() @trusted @nogc nothrow {
183 if( !flag!"MAIN" ) {
184 return ¶ms._stackDescriptor;
185 } else {
186 return params.stackDescriptorPtr;
187 }
188 }
189
190 @property FiberId identity() const nothrow @safe @nogc {
191 FiberId.UnderlyingType res;
192 res = to!FiberIdx(&this).value;
193 res |= incarnationCounter << theReactor.maxNumFibersBits;
194
195 return FiberId(res);
196 }
197
198 @property bool flag(string NAME)() const pure nothrow @safe @nogc {
199 return (_flags & __traits(getMember, Flags, NAME)) != 0;
200 }
201 @property void flag(string NAME)(bool value) pure nothrow @safe @nogc {
202 if (value) {
203 _flags |= __traits(getMember, Flags, NAME);
204 }
205 else {
206 import mecca.lib.integers: bitComplement;
207 _flags &= bitComplement(__traits(getMember, Flags, NAME));
208 }
209 }
210
211 @property FiberState state() pure const nothrow @safe @nogc {
212 return _state;
213 }
214
215 @property void state(FiberState newState) nothrow @safe @nogc {
216 DBG_ASSERT!"%s trying to switch state from %s to %s, but histogram claims no fibers in this state. %s"(
217 theReactor.stats.fibersHistogram[_state]>0, identity, _state, newState,
218 theReactor.stats.fibersHistogram );
219
220 theReactor.stats.fibersHistogram[_state]--;
221 theReactor.stats.fibersHistogram[newState]++;
222
223 _state = newState;
224 }
225
226 @property bool isAlive() const pure nothrow @safe @nogc {
227 with(FiberState) switch( state ) {
228 case None:
229 return false;
230 case Done:
231 assert(false, "Fiber is in an invalid state Done");
232 default:
233 return true;
234 }
235 }
236
237 private:
238 @notrace void wrapper() nothrow {
239 bool skipBody;
240 Throwable ex = null;
241
242 try {
243 // Nobody else will run it the first time the fiber is started. This results in some unfortunate
244 // code duplication
245 switchInto();
246 } catch (FiberInterrupt ex2) {
247 INFO!"Fiber %s killed by FiberInterrupt exception: %s"(identity, ex2.msg);
248 skipBody = true;
249 } catch(Throwable ex2) {
250 ex = ex2;
251 skipBody = true;
252 }
253
254 while (true) {
255 DBG_ASSERT!"skipBody=false with pending exception"(ex is null || skipBody);
256 scope(exit) ex = null;
257
258 if( !skipBody ) {
259 params.logsSavedContext = LogsFiberSavedContext.init;
260 INFO!"Fiber %s started generation %s flags=0x%0x"(identity, incarnationCounter, _flags);
261
262 ASSERT!"Reactor's current fiber isn't the running fiber" (theReactor.thisFiber is &this);
263 ASSERT!"Fiber %s is in state %s instead of Running" (state == FiberState.Running, identity, state);
264
265 try {
266 // Perform the actual fiber callback
267 params.fiberBody();
268 } catch (FiberInterrupt ex2) {
269 INFO!"Fiber %s killed by FiberInterrupt exception: %s"(identity, ex2.msg);
270 } catch (Throwable ex2) {
271 ex = ex2;
272 }
273 }
274
275 ASSERT!"Fiber still member of fiber group at termination" (params.fgChain.owner is null);
276
277 params.joinWaiters.signal();
278 if( ex is null ) {
279 INFO!"Fiber %s finished"(identity);
280 } else {
281 ERROR!"Fiber %s finished with exception: %s"(identity, ex.msg);
282 LOG_EXCEPTION(ex);
283 }
284
285 params.fiberBody.clear();
286 params.fiberName = null;
287 params.fiberPtr = null;
288 flag!"CALLBACK_SET" = false;
289 ASSERT!"Fiber %s state is %s instead of Running at end of execution"(
290 state == FiberState.Running, identity, state);
291 state = FiberState.Done;
292 incarnationCounter++;
293 if (ex !is null) {
294 theReactor.forwardExceptionToMain(ex);
295 assert(false);
296 } else {
297 skipBody = theReactor.fiberTerminated();
298 }
299 }
300 }
301
302 void switchInto() @safe @nogc {
303 // We might have a cross fiber hook. If we do, we need to repeat this part several times
304 bool switched = false;
305 do {
306 switchCurrExcBuf( ¶ms.currExcBuf );
307 if (!flag!"SPECIAL") {
308 params.flsBlock.switchTo();
309 } else {
310 FLSArea.switchToNone();
311 }
312 logSwitchInto();
313
314 if( theReactor.crossFiberHook !is null ) {
315 theReactor.performCrossFiberHook();
316 } else {
317 switched = true;
318 }
319 } while(!switched);
320
321 if (flag!"HAS_EXCEPTION") {
322 Throwable ex = params.currExcBuf.get();
323 ASSERT!"Fiber has exception, but exception is null"(ex !is null);
324 if (flag!"EXCEPTION_BT") {
325 params.currExcBuf.setTraceback(ex);
326 flag!"EXCEPTION_BT" = false;
327 }
328
329 flag!"HAS_EXCEPTION" = false;
330 throw ex;
331 }
332 }
333
334 void logSwitchInto() nothrow @safe @nogc{
335 logSwitchFiber(¶ms.logsSavedContext, cast( Parameters!logSwitchFiber[1] )identity.value);
336 }
337
338 private:
339 static extern(C) void* swapEhContextChooser(void * newContext) @nogc nothrow @notrace {
340 DBG_ASSERT!"Context is not null on first invocation"(newContext is null);
341 void* std = _d_eh_swapContext(newContext);
342 void* dwarf = _d_eh_swapContextDwarf(newContext);
343
344 if( std !is null ) {
345 _swapEhContext = &_d_eh_swapContext;
346 return std;
347 } else if( dwarf !is null ) {
348 _swapEhContext = &_d_eh_swapContextDwarf;
349 return dwarf;
350 }
351
352 // Cannot tell which is correct yet
353 return null;
354 }
355 }
356
357
358 /**
359 A handle to a running fiber.
360
361 This handle expires automatically when the fiber stops running. Unless you know, semantically, that a fiber is still running, don't assume
362 there is a running fiber attached to this handle.
363
364 The handle will correctly claim invalidity even if a new fiber is launched with the same FiberId.
365 */
366 struct FiberHandle {
367 private:
368 FiberId identity;
369 FiberIncarnation incarnation;
370
371 public:
372 /// Returns whether the handle was set
373 ///
374 /// Unlike `isValid`, this does not check whether the handle is still valid. It only returns whether the handle
375 /// is in initialized state.
376 @property bool isSet() pure const nothrow @safe @nogc {
377 return identity.isValid;
378 }
379
380 /// Returns whether the handle currently describes a running fiber.
381 @property bool isValid() const nothrow @safe @nogc {
382 return get() !is null;
383 }
384
385 /// the `FiberId` described by the handle. If the handle is no longer valid, will return FiberId.invalid
386 ///
387 /// Use `getFiberId` if you want the `FiberId` of a no-longer valid handle.
388 @property FiberId fiberId() const nothrow @safe @nogc {
389 if( isValid )
390 return identity;
391
392 return FiberId.invalid;
393 }
394
395 /// returns the original `FiberId` set for the handle, whether still valid or not
396 FiberId getFiberId() const nothrow @safe @nogc pure {
397 return identity;
398 }
399
400 /// Reset the handle to uninitialized state
401 @notrace void reset() nothrow @safe @nogc {
402 this = FiberHandle.init;
403 }
404
405 package:
406 this(ReactorFiber* fib) nothrow @safe @nogc {
407 opAssign(fib);
408 }
409
410 auto ref opAssign(ReactorFiber* fib) nothrow @safe @nogc {
411 if (fib) {
412 identity = fib.identity;
413 incarnation = fib.incarnationCounter;
414 }
415 else {
416 identity = FiberId.invalid;
417 }
418 return this;
419 }
420
421 ReactorFiber* get() const nothrow @safe @nogc {
422 if (!theReactor.isRunning)
423 return null;
424
425 if (!identity.isValid) {
426 return null;
427 }
428
429 ReactorFiber* fiber = &theReactor.allFibers[to!FiberIdx(identity).value];
430
431 DBG_ASSERT!"Fiber state is transient state Done"(fiber.state != FiberState.Done);
432 if(fiber.state == FiberState.None || fiber.incarnationCounter != incarnation) {
433 return null;
434 }
435
436 return fiber;
437 }
438 }
439
440 /**
441 The main scheduler for the micro-threading architecture.
442 */
443 struct Reactor {
444 // Prevent accidental copying
445 @disable this(this);
446
447 /// Delegates passed to `registerIdleCallback` must be of this signature
448 alias IdleCallbackDlg = bool delegate(Duration);
449
450 /// The options control aspects of the reactor's operation
451 struct OpenOptions {
452 /// Maximum number of fibers.
453 ushort numFibers = 256;
454 /// Stack size of each fiber (except the main fiber). The reactor will allocate numFiber*fiberStackSize during startup
455 size_t fiberStackSize = 32*KB;
456 /**
457 How often does the GC's collection run.
458
459 The reactor uses unconditional periodic collection, rather than lazy evaluation one employed by the default GC
460 settings. This setting sets how often the collection cycle should run. See `gcRunThreshold` for how to not
461 run the GC collection when not needed.
462 */
463 Duration gcInterval = 30.seconds;
464
465 /**
466 * Allocation threshold to trigger a GC run
467 *
468 * If the amount of memory allocated since the previous GC run is less than this amount of bytes, the GC scan
469 * will be skipped.
470 *
471 * Setting this value to 0 forces a run every `gcInterval`, regardless of how much was allocated.
472 */
473 size_t gcRunThreshold = 16*MB;
474
475 /**
476 Base granularity of the reactor's timer.
477
478 Any scheduled task is scheduled at no better accuracy than timerGranularity. $(B In addition to) the fact that
479 a timer task may be delayed. As such, with a 1ms granularity, a task scheduled for 1.5ms from now is the same
480 as a task scheduled for 2ms from now, which may run 3ms from now.
481 */
482 Duration timerGranularity = 1.msecs;
483 /**
484 Hogger detection threshold.
485
486 A hogger is a fiber that does not release the CPU to run other tasks for a long period of time. Often, this is
487 a result of a bug (i.e. - calling the OS's `sleep` instead of the reactor's).
488
489 Hogger detection works by measuring how long each fiber took until it allows switching away. If the fiber took
490 more than hoggerWarningThreshold, a warning is logged.
491 */
492 Duration hoggerWarningThreshold = 200.msecs;
493 /**
494 Maximum desired fiber run time
495
496 A fiber should aim not to run more than this much time without a voluntary context switch. This value affects
497 the `shouldYield` and `considerYield` calls.
498 */
499 Duration maxDesiredRunTime = 150.msecs;
500 /**
501 Hard hang detection.
502
503 This is a similar safeguard to that used by the hogger detection. If activated (disabled by default), it
504 premptively prompts the reactor every set time to see whether fibers are still being switched in/out. If it
505 finds that the same fiber is running, without switching out, for too long, it terminates the entire program.
506 */
507 Duration hangDetectorTimeout = Duration.zero;
508 /**
509 Whether to enable fault handlers
510
511 The fault handlers catch program faults (such as segmentation faults) and log them with their backtrace.
512 */
513 bool faultHandlersEnabled = true;
514
515 /// Maximal number of timers that can be simultaneously registered.
516 size_t numTimers = 10_000;
517
518 /// Number of threads servicing deferred tasks
519 uint numThreadsInPool = 4;
520 /// Worker thread stack size
521 size_t threadStackSize = 512*KB;
522
523 /// Whether we have enabled deferToThread
524 bool threadDeferralEnabled;
525
526 /**
527 Whether the reactor should register the default (fd processing) idle handler
528
529 If this value is set to `false`, the poller is still opened. It's idle function would not be automatically
530 called, however, so file operations might block indefinitely unless another mechanism (such as timer based)
531 is put in place to call it periodically.
532
533 The non-registered idle handler can be manually triggered by calling `poller.poll`.
534 */
535 bool registerDefaultIdler = true;
536
537 version(unittest) {
538 /// Disable all GC collection during the reactor run time. Only available for UTs.
539 bool utGcDisabled;
540 } else {
541 private enum utGcDisabled = false;
542 }
543 }
544
545 /// Used by `reportStats` to report statistics about the reactor
546 struct Stats {
547 ulong[FiberState.max+1] fibersHistogram; /// Total number of user fibers in each `FiberState`
548 ulong numContextSwitches; /// Number of time we switched between fibers
549 ulong idleCycles; /// Total number of idle cycles
550
551 /// Returns number of currently used fibers
552 @property ulong numUsedFibers() pure const nothrow @safe @nogc {
553 ulong ret;
554
555 with(FiberState) {
556 foreach( state; [Starting, Scheduled, Running, Sleeping]) {
557 ret += fibersHistogram[state];
558 }
559 }
560
561 return ret;
562 }
563
564 /// Returns number of fibers available to be run
565 @property ulong numFreeFibers() pure const nothrow @safe @nogc {
566 ulong ret;
567
568 with(FiberState) {
569 foreach( state; [None]) {
570 ret += fibersHistogram[state];
571 }
572
573 DBG_ASSERT!"%s fibers in Done state. Should be 0"(fibersHistogram[Done] == 0, fibersHistogram[Done]);
574 }
575
576 return ret;
577 }
578 }
579
580 private:
581 enum MAX_IDLE_CALLBACKS = 16;
582 enum TIMER_NUM_BINS = 256;
583 enum TIMER_NUM_LEVELS = 4;
584 enum MAX_DEFERRED_TASKS = 1024;
585 version(unittest) package enum UT_MAX_DEFERRED_TASKS = MAX_DEFERRED_TASKS;
586
587 enum GUARD_ZONE_SIZE = SYS_PAGE_SIZE;
588
589 enum NUM_SPECIAL_FIBERS = 2;
590 enum ZERO_DURATION = Duration.zero;
591
592 enum MainFiberId = FiberId(0);
593 enum IdleFiberId = FiberId(1);
594
595 bool _open;
596 bool _running;
597 bool _stopping;
598 bool _gcCollectionNeeded;
599 bool _gcCollectionForce;
600 bool _hangDetectorEnabled;
601 ubyte maxNumFibersBits; // Number of bits sufficient to represent the maximal number of fibers
602 bool nothingScheduled; // true if there is no scheduled fiber
603 enum HIGH_PRIORITY_SCHEDULES_RATIO = 2; // How many high priority schedules before shceduling a low priority fiber
604 ubyte highPrioritySchedules; // Number of high priority schedules done
605 FiberIdx.UnderlyingType maxNumFibersMask;
606 int reactorReturn;
607 int criticalSectionNesting;
608 OpenOptions optionsInEffect;
609 Stats stats;
610 GC.Stats lastGCStats;
611
612 MmapBuffer fiberStacks;
613 MmapArray!ReactorFiber allFibers;
614 LinkedQueueWithLength!(ReactorFiber*) freeFibers;
615 enum FiberPriorities { NORMAL, HIGH, IMMEDIATE };
616 LinkedListWithOwner!(ReactorFiber*) scheduledFibersNormal, scheduledFibersHigh, scheduledFibersImmediate;
617
618 ReactorFiber* _thisFiber;
619 ReactorFiber* mainFiber;
620 ReactorFiber* idleFiber;
621 FixedArray!(IdleCallbackDlg, MAX_IDLE_CALLBACKS) idleCallbacks;
622 // Point to idleCallbacks as a range, in case it gets full and we need to spill over to GC allocation
623 IdleCallbackDlg[] actualIdleCallbacks;
624 __gshared Timer hangDetectorTimer;
625
626 SignalHandlerValue!TscTimePoint fiberRunStartTime;
627 void delegate() nothrow @nogc @safe crossFiberHook;
628 FiberHandle crossFiberHookCaller; // FiberHandle to return to after performing the hook
629
630 alias TimedCallbackGeneration = TypedIdentifier!("TimedCallbackGeneration", ulong, ulong.max, ulong.max);
631 struct TimedCallback {
632 TimedCallback* _next, _prev;
633 timeQueue.OwnerAttrType _owner;
634 TimedCallbackGeneration generation;
635 TscTimePoint timePoint;
636 ulong intervalCycles; // How many cycles between repeatetions. Zero means non-repeating
637
638 Closure closure;
639 }
640
641 // TODO change to mmap pool or something
642 SimplePool!(TimedCallback) timedCallbacksPool;
643 TimedCallbackGeneration.Allocator timedCallbackGeneration;
644 CascadingTimeQueue!(TimedCallback*, TIMER_NUM_BINS, TIMER_NUM_LEVELS, true) timeQueue;
645
646 ThreadPool!MAX_DEFERRED_TASKS threadPool;
647 version(unittest) package ref auto utThreadPool() inout { return threadPool; }
648
649 public:
650 /// Report whether the reactor has been properly opened (i.e. - setup has been called).
651 @property bool isOpen() const pure nothrow @safe @nogc {
652 return _open;
653 }
654
655 /// Report whether the reactor is currently active
656 ///
657 /// Unlike `isRunning`, this will return `false` during the reactor shutdown.
658 @property bool isActive() const pure nothrow @safe @nogc {
659 return _running && !_stopping;
660 }
661
662 /// Report whether the reactor is currently running
663 @property bool isRunning() const pure nothrow @safe @nogc {
664 return _running;
665 }
666
667 /**
668 Set the reactor up for doing work.
669
670 All options must be set before calling this function.
671 */
672 void setup(OpenOptions options = OpenOptions.init) {
673 INFO!"Setting up reactor"();
674
675 assert (!isOpen, "reactor.setup called twice");
676 _open = true;
677 assert (thread_isMainThread);
678 assert (options.numFibers > NUM_SPECIAL_FIBERS);
679 reactorReturn = 0;
680 optionsInEffect = options;
681
682 maxNumFibersBits = bitsInValue(optionsInEffect.numFibers - 1);
683 maxNumFibersMask = createBitMask!(FiberIdx.UnderlyingType)(maxNumFibersBits);
684
685 stats = Stats.init;
686 stats.fibersHistogram[FiberState.None] = options.numFibers;
687
688 const stackPerFib = (((options.fiberStackSize + SYS_PAGE_SIZE - 1) / SYS_PAGE_SIZE) + 1) * SYS_PAGE_SIZE;
689 fiberStacks.allocate(stackPerFib * options.numFibers);
690 allFibers.allocate(options.numFibers);
691
692 _thisFiber = null;
693 criticalSectionNesting = 0;
694 idleCallbacks.length = 0;
695 actualIdleCallbacks = null;
696
697 foreach(i, ref fib; allFibers) {
698 auto stack = fiberStacks[i * stackPerFib .. (i + 1) * stackPerFib];
699 errnoEnforce(mprotect(stack.ptr, GUARD_ZONE_SIZE, PROT_NONE) == 0, "Guard zone protection failed");
700 fib.setup(stack[GUARD_ZONE_SIZE .. $], i==0);
701
702 if (i >= NUM_SPECIAL_FIBERS) {
703 freeFibers.append(&fib);
704 }
705 }
706
707 mainFiber = &allFibers[MainFiberId.value];
708 mainFiber.flag!"SPECIAL" = true;
709 mainFiber.flag!"CALLBACK_SET" = true;
710 mainFiber.state = FiberState.Running;
711 setFiberName(mainFiber, "mainFiber", &mainloop);
712
713 idleFiber = &allFibers[IdleFiberId.value];
714 idleFiber.flag!"SPECIAL" = true;
715 idleFiber.flag!"CALLBACK_SET" = true;
716 idleFiber.params.fiberBody.set(&idleLoop);
717 idleFiber.state = FiberState.Sleeping;
718 setFiberName(idleFiber, "idleFiber", &idleLoop);
719
720 timedCallbacksPool.open(options.numTimers, true);
721 timeQueue.open(options.timerGranularity);
722
723 if( options.faultHandlersEnabled )
724 registerFaultHandlers();
725
726 if( options.threadDeferralEnabled )
727 threadPool.open(options.numThreadsInPool, options.threadStackSize);
728
729 import mecca.reactor.io.fd;
730 _openReactorEpoll();
731
732 if(options.registerDefaultIdler) {
733 import mecca.reactor.subsystems.poller;
734 theReactor.registerIdleCallback(&poller.reactorIdle);
735 }
736
737 import mecca.reactor.io.signals;
738 reactorSignal._open();
739
740 enum TIMER_GRANULARITY = 4; // Number of wakeups during the monitored period
741 Duration threshold = optionsInEffect.hangDetectorTimeout / TIMER_GRANULARITY;
742 hangDetectorTimer = Timer(threshold, &hangDetectorHandler);
743 }
744
745 /**
746 Shut the reactor down.
747 */
748 void teardown() {
749 INFO!"Tearing down reactor"();
750
751 ASSERT!"reactor teardown called on non-open reactor"(isOpen);
752 ASSERT!"reactor teardown called on still running reactor"(!isRunning);
753 ASSERT!"reactor teardown called inside a critical section"(criticalSectionNesting==0);
754
755 import mecca.reactor.io.signals;
756 reactorSignal._close();
757
758 import mecca.reactor.io.fd;
759 _closeReactorEpoll();
760
761 foreach(i, ref fib; allFibers) {
762 fib.teardown();
763 }
764
765 if( optionsInEffect.threadDeferralEnabled )
766 threadPool.close();
767 switchCurrExcBuf(null);
768
769 // disableGCTracking();
770
771 if( optionsInEffect.faultHandlersEnabled )
772 deregisterFaultHandlers();
773
774 allFibers.free();
775 fiberStacks.free();
776 timeQueue.close();
777 timedCallbacksPool.close();
778
779 setToInit(freeFibers);
780 setToInit(scheduledFibersNormal);
781 setToInit(scheduledFibersHigh);
782 setToInit(scheduledFibersImmediate);
783 nothingScheduled = true;
784
785 _thisFiber = null;
786 mainFiber = null;
787 idleFiber = null;
788 idleCallbacks.length = 0;
789 actualIdleCallbacks = null;
790
791 _open = false;
792 }
793
794 /**
795 Register an idle handler callback.
796
797 The reactor handles scheduling and fibers switching, but has no built-in mechanism for scheduling sleeping fibers
798 back to execution (except fibers sleeping on a timer, of course). Mechanisms such as file descriptor watching are
799 external, and are registered using this function.
800
801 The idler callback should receive a timeout variable. This indicates the time until the closest timer expires. If
802 no other event comes in, the idler should strive to wake up after that long. Waking up sooner will, likely, cause
803 the idler to be called again. Waking up later will delay timer tasks (which is allowed by the API contract).
804
805 It is allowed to register more than one idler callback. Doing so, however, will cause $(B all) of them to be
806 called with a timeout of zero (i.e. - don't sleep), resulting in a busy wait for events.
807
808 The idler is expected to return a boolean value indicating whether the time spent inside the idler is to be
809 counted as idle time, or whether that's considered "work". This affects the results returned by the `idleCycles`
810 field returned by the `reactorStats`.
811 */
812 void registerIdleCallback(IdleCallbackDlg dg) nothrow @safe {
813 // You will notice our deliberate lack of function to unregister
814 if( actualIdleCallbacks.length==idleCallbacks.capacity ) {
815 WARN!"Idle callbacks capacity reached - switching to GC allocated list"();
816 }
817
818 if( actualIdleCallbacks.length<idleCallbacks.capacity ) {
819 idleCallbacks ~= dg;
820 actualIdleCallbacks = idleCallbacks[];
821 } else {
822 actualIdleCallbacks ~= dg;
823 }
824
825 DEBUG!"%s idle callbacks registered"(actualIdleCallbacks.length);
826 }
827
828 /**
829 * Spawn a new fiber for execution.
830 *
831 * Parameters:
832 * The first argument must be the function/delegate to call inside the new fiber. If said callable accepts further
833 * arguments, then they must be provided as further arguments to spawnFiber.
834 *
835 * Returns:
836 * A FiberHandle to the newly created fiber.
837 */
838 @notrace FiberHandle spawnFiber(T...)(T args) nothrow @safe @nogc {
839 static assert(T.length>=1, "Must pass at least the function/delegate to spawnFiber");
840 static assert(isDelegate!(T[0]) || isFunctionPointer!(T[0]),
841 "spawnFiber first argument must be function or delegate");
842 static assert( is( ReturnType!(T[0]) == void ), "spawnFiber callback must be of type void" );
843 auto fib = _spawnFiber(false);
844 fib.params.fiberBody.set(args);
845 setFiberName(fib, "Fiber", args[0]);
846 return FiberHandle(fib);
847 }
848
849 /**
850 * Spawn a new fiber for execution.
851 *
852 * Params:
853 * F = The function or delegate to call inside the fiber.
854 * args = The arguments for F
855 *
856 * Returns:
857 * A FiberHandle to the newly created fiber.
858 */
859 @notrace FiberHandle spawnFiber(alias F)(Parameters!F args)
860 if (!isType!F) {
861 static assert( is( ReturnType!F == void ), "spawnFiber callback must be of type void" );
862 auto fib = _spawnFiber(false);
863 import std.algorithm: move;
864 // pragma(msg, genMoveArgument( args.length, "fib.params.fiberBody.set!F", "args" ) );
865 mixin( genMoveArgument( args.length, "fib.params.fiberBody.set!F", "args" ) );
866
867 setFiberName(fib, F.mangleof, &F);
868 return FiberHandle(fib);
869 }
870
871 /// Returns whether currently running fiber is the idle fiber.
872 @property bool isIdle() pure const nothrow @safe @nogc {
873 return thisFiber is idleFiber;
874 }
875
876 /// Returns whether currently running fiber is the main fiber.
877 @property bool isMain() pure const nothrow @safe @nogc {
878 return thisFiber is mainFiber;
879 }
880
881 /// Returns whether currently running fiber is a special (i.e. - non-user) fiber
882 @property bool isSpecialFiber() const nothrow @safe @nogc {
883 return thisFiber.flag!"SPECIAL";
884 }
885
886 /// Returns a FiberHandle to the currently running fiber
887 @property FiberHandle currentFiberHandle() nothrow @safe @nogc {
888 // XXX This assert may be incorrect, but it is easier to remove an assert than to add one
889 DBG_ASSERT!"Should not blindly get fiber handle of special fiber %s"(!isSpecialFiber, currentFiberId);
890 return FiberHandle(thisFiber);
891 }
892 @property package ReactorFiber* currentFiberPtr() nothrow @safe @nogc {
893 return getCurrentFiberPtr(false);
894 }
895 @notrace private ReactorFiber* getCurrentFiberPtr(bool specialOkay) nothrow @safe @nogc {
896 // XXX This assert may be incorrect, but it is easier to remove an assert than to add one
897 ASSERT!"Should not blindly get fiber handle of special fibers %s"(specialOkay || !isSpecialFiber, currentFiberId);
898 return thisFiber;
899 }
900 /**
901 Returns the FiberId of the currently running fiber.
902
903 You should almost never store the FiberId for later comparison or pass it to another fiber. Doing so risks having the current fiber
904 die and another one spawned with the same FiberId. If that's what you want to do, use currentFiberHandle instead.
905 */
906 @property FiberId currentFiberId() const nothrow @safe @nogc {
907 return thisFiber.identity;
908 }
909
910 /// Returns the `FiberState` of the specified fiber.
911 FiberState getFiberState(FiberHandle fh) const nothrow @safe @nogc {
912 auto fiber = fh.get();
913
914 if( fiber is null )
915 return FiberState.None;
916
917 if( fiber.state==FiberState.Sleeping && fiber.flag!"SCHEDULED" )
918 return FiberState.Scheduled;
919
920 return fiber.state;
921 }
922
923 /**
924 Starts up the reactor.
925
926 The reactor should already have at least one user fiber at that point, or it will start, but sit there and do nothing.
927
928 This function "returns" only after the reactor is stopped and no more fibers are running.
929
930 Returns:
931 The return value from `start` is the value passed to the `stop` function.
932 */
933 int start() {
934 _isReactorThread = true;
935 scope(exit) _isReactorThread = false;
936
937 META!"Reactor started"();
938 assert( idleFiber !is null, "Reactor started without calling \"setup\" first" );
939 mainloop();
940 META!"Reactor stopped"();
941
942 return reactorReturn;
943 }
944
945 /**
946 Stop the reactor, killing all fibers.
947
948 This will kill all running fibers and trigger a return from the original call to Reactor.start.
949
950 Typically, this function call never returns (throws ReactorExit). However, if stop is called while already in the
951 process of stopping, it will just return. It is, therefor, not wise to rely on that fact.
952
953 Params:
954 reactorReturn = The return value to be returned from `start`
955 */
956 void stop(int reactorReturn = 0) @safe @nogc {
957 if( !isActive ) {
958 ERROR!"Reactor.stop called, but reactor is not running"();
959 return;
960 }
961
962 INFO!"Stopping reactor with exit code %s"(reactorReturn);
963 this.reactorReturn = reactorReturn;
964
965 _stopping = true;
966 if( !isMain() ) {
967 resumeSpecialFiber(mainFiber);
968 }
969
970 throw mkEx!ReactorExit();
971 }
972
973 /**
974 enter a no-fiber switch piece of code.
975
976 If the code tries to switch away from the current fiber before leaveCriticalSection is called, the reactor will throw an assert.
977 This helps writing code that does interruption sensitive tasks without locks and making sure future changes don't break it.
978
979 Critical sections are nestable.
980
981 Also see `criticalSection` below.
982 */
983 void enterCriticalSection() nothrow @safe @nogc {
984 pragma(inline, true);
985 criticalSectionNesting++;
986 }
987
988 /// leave the innermost critical section.
989 void leaveCriticalSection() nothrow @safe @nogc {
990 pragma(inline, true);
991 assert (criticalSectionNesting > 0);
992 criticalSectionNesting--;
993 }
994
995 /// Reports whether execution is currently within a critical section
996 @property bool isInCriticalSection() const pure nothrow @safe @nogc {
997 return criticalSectionNesting > 0;
998 }
999
1000 /** Make sure we are allowed to context switch from this point.
1001 *
1002 * This will be called automatically if an actual context switch is attempted. You might wish to call this function
1003 * explicitly, however, from contexts that $(I might) context switch, so that they fail even if they don't actually
1004 * attempt it, in accordance with the "fail early" doctrine.
1005 */
1006 void assertMayContextSwitch(string message = "Simulated context switch") nothrow @safe @nogc {
1007 pragma(inline, true);
1008 ASSERT!"Context switch from outside the reactor thread: %s"(isReactorThread, message);
1009 ASSERT!"Context switch while inside a critical section: %s"(!isInCriticalSection, message);
1010 }
1011
1012 /**
1013 * Return a RAII object handling a critical section
1014 *
1015 * The function enters a critical section. Unlike enterCriticalSection, however, there is no need to explicitly call
1016 * leaveCriticalSection. Instead, the function returns a variable that calls leaveCriticalSection when it goes out of scope.
1017 *
1018 * There are two advantages for using criticalSection over enter/leaveCriticalSection. The first is for nicer scoping:
1019 * ---
1020 * with(theReactor.criticalSection) {
1021 * // Code in critical section goes here
1022 * }
1023 * ---
1024 *
1025 * The second case is if you need to switch, within the same code, between critical section and none:
1026 * ---
1027 * auto cs = theReactor.criticalSection;
1028 *
1029 * // Some code
1030 * cs.leave();
1031 * // Some code that sleeps
1032 * cs.enter();
1033 *
1034 * // The rest of the critical section code
1035 * ---
1036 *
1037 * The main advantage over enter/leaveCriticalSection is that this way, the critical section never leaks. Any exception thrown, either
1038 * from the code inside the critical section or the code temporary out of it, will result in the correct number of leaveCriticalSection
1039 * calls to zero out the effect.
1040 *
1041 * Also note that there is no need to call leave at the end of the code. Cs's destructor will do it for us $(B if necessary).
1042 */
1043 @property auto criticalSection() nothrow @safe @nogc {
1044 pragma(inline, true);
1045 static struct CriticalSection {
1046 private:
1047 bool owner = true;
1048
1049 public:
1050 @disable this(this);
1051 ~this() nothrow @safe @nogc {
1052 if( owner )
1053 leave();
1054 }
1055
1056 void enter() nothrow @safe @nogc {
1057 DBG_ASSERT!"The same critical section was activated twice"(!owner);
1058 theReactor.enterCriticalSection();
1059 owner = true;
1060 }
1061
1062 void leave() nothrow @safe @nogc {
1063 DBG_ASSERT!"Asked to leave critical section not owned by us"(owner);
1064 theReactor.leaveCriticalSection();
1065 owner = false;
1066 }
1067 }
1068 enterCriticalSection();
1069 return CriticalSection();
1070 }
1071 unittest {
1072 testWithReactor({
1073 {
1074 with( theReactor.criticalSection ) {
1075 assertThrows!AssertError( theReactor.yield() );
1076 }
1077 }
1078
1079 theReactor.yield();
1080 });
1081 }
1082
1083 /**
1084 * Temporarily surrender the CPU for other fibers to run.
1085 *
1086 * Unlike suspend, the current fiber will automatically resume running after any currently scheduled fibers are finished.
1087 */
1088 @notrace void yield() @safe @nogc {
1089 // TODO both scheduled and running is not a desired state to be in other than here.
1090 resumeFiber(thisFiber);
1091 suspendCurrentFiber();
1092 }
1093
1094 /**
1095 * Returns whether the fiber is already running for a long time.
1096 *
1097 * Fibers that run for too long prevent other fibers from operating properly. On the other hand, fibers that initiate
1098 * a context switch needlessly load the system with overhead.
1099 *
1100 * This function reports whether the fiber is already running more than the desired time.
1101 *
1102 * The base time used is taken from `OpenOptions.maxDesiredRunTime`.
1103 *
1104 * Params:
1105 * tolerance = A multiplier for the amount of acceptable run time. Specifying 4 here will give you 4 times as much
1106 * time to run before a context switch is deemed necessary.
1107 */
1108 @notrace bool shouldYield(uint tolerance = 1) const nothrow @safe @nogc {
1109 if( _stopping )
1110 return true;
1111
1112 auto now = TscTimePoint.hardNow();
1113 return (now - fiberRunStartTime) > tolerance*optionsInEffect.maxDesiredRunTime;
1114 }
1115
1116 /**
1117 Perform yield if fiber is running long enough.
1118
1119 Arguments and meaning are the same as for `shouldYield`.
1120
1121 Returns:
1122 Whether a yield actually took place.
1123 */
1124 @notrace bool considerYield(uint tolerance = 1) @safe @nogc {
1125 if( shouldYield(tolerance) ) {
1126 yield();
1127 return true;
1128 }
1129
1130 return false;
1131 }
1132
1133 /**
1134 * Give a fiber temporary priority in execution.
1135 *
1136 * Setting fiber priority means that the next time this fiber is scheduled, it will be scheduled ahead of other
1137 * fibers already scheduled to be run.
1138 *
1139 * Returns a RAII object that sets the priority back when destroyed. Typically, that happens at the end of the scope.
1140 *
1141 * Can be used as:
1142 * ---
1143 * with(boostFiberPriority()) {
1144 * // High priority stuff goes here
1145 * }
1146 * ---
1147 */
1148 auto boostFiberPriority() nothrow @safe @nogc {
1149 static struct FiberPriorityRAII {
1150 private:
1151 FiberHandle fh;
1152
1153 public:
1154 @disable this(this);
1155 this(FiberHandle fh) @safe @nogc nothrow {
1156 this.fh = fh;
1157 }
1158
1159 ~this() @safe @nogc nothrow {
1160 if( fh.isValid ) {
1161 ASSERT!"Cannot move priority watcher between fibers (opened on %s)"(
1162 fh == theReactor.currentFiberHandle, fh.fiberId);
1163
1164 ASSERT!"Trying to reset priority which is not set"( theReactor.thisFiber.flag!"PRIORITY" );
1165 theReactor.thisFiber.flag!"PRIORITY" = false;
1166 }
1167 }
1168 }
1169
1170 ASSERT!"Cannot ask to prioritize a non-user fiber"(!isSpecialFiber);
1171 ASSERT!"Asked to prioritize a fiber %s which is already high priority"(
1172 !thisFiber.flag!"PRIORITY", currentFiberId );
1173 DEBUG!"Setting fiber priority"();
1174 thisFiber.flag!"PRIORITY" = true;
1175
1176 return FiberPriorityRAII(currentFiberHandle);
1177 }
1178
1179 /// Handle used to manage registered timers
1180 struct TimerHandle {
1181 private:
1182 TimedCallback* callback;
1183 TimedCallbackGeneration generation;
1184
1185 public:
1186
1187 this(TimedCallback* callback) nothrow @safe @nogc {
1188 DBG_ASSERT!"Constructing TimedHandle from null callback"(callback !is null);
1189 this.callback = callback;
1190 this.generation = callback.generation;
1191 }
1192
1193 /// Returns whether the handle was used
1194 ///
1195 /// returns:
1196 /// `true` if the handle was set. `false` if it is still in its init value.
1197 @property bool isSet() const pure nothrow @safe @nogc {
1198 return callback !is null;
1199 }
1200
1201 /// Returns whether the handle describes a currently registered task
1202 @property bool isValid() const nothrow @safe @nogc {
1203 return
1204 isSet() &&
1205 theReactor.isOpen &&
1206 callback._owner !is null &&
1207 generation == callback.generation;
1208 }
1209
1210 /// Revert the handle to init value, forgetting the timer it points to
1211 ///
1212 /// This call will $(B not) cancel the actual timer. Use `cancelTimer` for that.
1213 @notrace void reset() nothrow @safe @nogc {
1214 callback = null;
1215 generation = TimedCallbackGeneration.invalid;
1216 }
1217
1218 /// Cancel a currently registered timer
1219 void cancelTimer() nothrow @safe @nogc {
1220 if( isValid ) {
1221 theReactor.cancelTimerInternal(this);
1222 }
1223
1224 reset();
1225 }
1226 }
1227
1228 /**
1229 * Registers a timer task.
1230 *
1231 * Params:
1232 * F = the callable to be invoked when the timer expires
1233 * timeout = when the timer is be called
1234 * params = the parameters to call F with
1235 *
1236 * Returns:
1237 * A handle to the just registered timer.
1238 */
1239 TimerHandle registerTimer(alias F)(Timeout timeout, Parameters!F params) nothrow @safe @nogc {
1240 TimedCallback* callback = allocTimedCallback();
1241 callback.closure.set(&F, params);
1242 callback.timePoint = timeout.expiry;
1243 callback.intervalCycles = 0;
1244
1245 timeQueue.insert(callback);
1246
1247 return TimerHandle(callback);
1248 }
1249
1250 /// ditto
1251 TimerHandle registerTimer(alias F)(Duration timeout, Parameters!F params) nothrow @safe @nogc {
1252 return registerTimer!F(Timeout(timeout), params);
1253 }
1254
1255 /**
1256 * Register a timer callback
1257 *
1258 * Same as `registerTimer!F`, except with the callback as an argument. Callback cannot accept parameters.
1259 */
1260 TimerHandle registerTimer(T)(Timeout timeout, T dg) nothrow @safe @nogc {
1261 TimedCallback* callback = allocTimedCallback();
1262 callback.closure.set(dg);
1263 callback.timePoint = timeout.expiry;
1264 callback.intervalCycles = 0;
1265
1266 timeQueue.insert(callback);
1267
1268 return TimerHandle(callback);
1269 }
1270
1271 /// ditto
1272 TimerHandle registerTimer(T)(Duration timeout, T dg) nothrow @safe @nogc {
1273 return registerTimer!T(Timeout(timeout), dg);
1274 }
1275
1276 /**
1277 * registers a timer that will repeatedly trigger at set intervals.
1278 *
1279 * You do not control precisely when the callback is invoked, only how often. The invocations are going to be evenly
1280 * spaced out (best effort), but the first invocation might be almost immediately after the call or a whole
1281 * `interval` after.
1282 *
1283 * You can use the `firstRun` argument to control when the first invocation is going to be (but the same rule will
1284 * still apply to the second one).
1285 *
1286 * Params:
1287 * interval = the frequency with which the callback will be called.
1288 * dg = the callback to invoke
1289 * F = an alias to the function to be called
1290 * params = the arguments to pass to F on each invocation
1291 * firstRun = if supplied, directly sets when is the first time the timer shall run. The value does not have any
1292 * special constraints.
1293 */
1294 TimerHandle registerRecurringTimer(Duration interval, void delegate() dg) nothrow @safe @nogc {
1295 TimedCallback* callback = _registerRecurringTimer(interval);
1296 callback.closure.set(dg);
1297 return TimerHandle(callback);
1298 }
1299
1300 /// ditto
1301 TimerHandle registerRecurringTimer(alias F)(Duration interval, Parameters!F params) nothrow @safe @nogc {
1302 TimedCallback* callback = _registerRecurringTimer(interval);
1303 callback.closure.set(&F, params);
1304 return TimerHandle(callback);
1305 }
1306
1307 /// ditto
1308 TimerHandle registerRecurringTimer(Duration interval, void delegate() dg, Timeout firstRun) nothrow @safe @nogc {
1309 TimedCallback* callback = allocRecurringTimer(interval);
1310 callback.timePoint = firstRun.expiry;
1311 callback.closure.set(dg);
1312
1313 timeQueue.insert(callback);
1314
1315 return TimerHandle(callback);
1316 }
1317
1318 /// ditto
1319 TimerHandle registerRecurringTimer(alias F)(Duration interval, Parameters!F params, Timeout firstRun) nothrow @safe @nogc {
1320 TimedCallback* callback = allocRecurringTimer(interval);
1321 callback.timePoint = firstRun.expiry;
1322 callback.closure.set(&F, params);
1323
1324 timeQueue.insert(callback);
1325
1326 return TimerHandle(callback);
1327 }
1328
1329 private void cancelTimerInternal(TimerHandle handle) nothrow @safe @nogc {
1330 DBG_ASSERT!"cancelTimerInternal called with invalid handle"( handle.isValid );
1331 timeQueue.cancel(handle.callback);
1332 timedCallbacksPool.release(handle.callback);
1333 }
1334
1335 /**
1336 * Schedule a callback for out of bounds immediate execution.
1337 *
1338 * For all intents and purposes, `call` is identical to `registerTimer` with an expired timeout.
1339 */
1340 TimerHandle call(alias F)(Parameters!F params) nothrow @safe @nogc {
1341 return registerTimer!F(Timeout.elapsed, params);
1342 }
1343
1344 /// ditto
1345 TimerHandle call(T)(T dg) nothrow @safe @nogc {
1346 return registerTimer(Timeout.elapsed, dg);
1347 }
1348
1349 /// Suspend the current fiber for a specified amount of time
1350 void sleep(Duration duration) @safe @nogc {
1351 sleep(Timeout(duration));
1352 }
1353
1354 /// ditto
1355 void sleep(Timeout until) @safe @nogc {
1356 assert(until != Timeout.init, "sleep argument uninitialized");
1357 auto timerHandle = registerTimer!resumeFiber(until, currentFiberHandle, false);
1358 scope(failure) timerHandle.cancelTimer();
1359
1360 suspendCurrentFiber();
1361 }
1362
1363 /**
1364 Resume a suspended fiber
1365
1366 You are heartily encouraged not to use this function directly. In almost all cases, it is better to use one of the
1367 synchronization primitives instead.
1368
1369 Params:
1370 handle = the handle of the fiber to be resumed.
1371 priority = If true, schedule the fiber before all currently scheduled fibers. If the fiber is already scheduled
1372 (`getFiberState` returns `Scheduled`), this will move it to the top of the queue.
1373 */
1374 @notrace void resumeFiber(FiberHandle handle, bool priority = false) nothrow @safe @nogc {
1375 auto fiber = handle.get();
1376
1377 if( fiber !is null )
1378 resumeFiber(handle.get(), priority);
1379 }
1380
1381 /**
1382 Suspend the current fiber
1383
1384 If `timeout` is given and expires, the suspend will throw a `TimeoutExpired` exception.
1385
1386 You are heartily encouraged not to use this function directly. In almost all cases, it is better to use one of the
1387 synchronization primitives instead.
1388 */
1389 @notrace void suspendCurrentFiber(Timeout timeout) @trusted @nogc {
1390 if (timeout == Timeout.infinite)
1391 return suspendCurrentFiber();
1392
1393 assertMayContextSwitch("suspendCurrentFiber");
1394
1395 TimerHandle timeoutHandle;
1396 scope(exit) timeoutHandle.cancelTimer();
1397 bool timeoutExpired;
1398
1399 if (timeout == Timeout.elapsed) {
1400 throw mkEx!TimeoutExpired;
1401 }
1402
1403 static void resumer(FiberHandle fibHandle, TimerHandle* cookie, bool* timeoutExpired) nothrow @safe @nogc{
1404 *cookie = TimerHandle.init;
1405 ReactorFiber* fib = fibHandle.get;
1406 assert( fib !is null, "Fiber disappeared while suspended with timer" );
1407
1408 // Throw TimeoutExpired only if we're the ones who resumed the fiber. this prevents a race when
1409 // someone else had already woken the fiber, but it just didn't get time to run while the timer expired.
1410 // this probably indicates fibers hogging the CPU for too long (starving others)
1411 *timeoutExpired = ! fib.flag!"SCHEDULED";
1412
1413 /+
1414 if (! *timeoutExpired)
1415 fib.WARN_AS!"#REACTOR fiber resumer invoked, but fiber already scheduled (starvation): %s scheduled, %s pending"(
1416 theReactor.scheduledFibers.length, theReactor.pendingFibers.length);
1417 +/
1418
1419 theReactor.resumeFiber(fib);
1420 }
1421
1422 timeoutHandle = registerTimer!resumer(timeout, currentFiberHandle, &timeoutHandle, &timeoutExpired);
1423 switchToNext();
1424
1425 if( timeoutExpired )
1426 throw mkEx!TimeoutExpired();
1427 }
1428
1429 /// ditto
1430 @notrace void suspendCurrentFiber() @safe @nogc {
1431 switchToNext();
1432 }
1433
1434 /**
1435 * run a function inside a different thread.
1436 *
1437 * Runs a function inside a thread. Use this function to run CPU bound processing without freezing the other fibers.
1438 *
1439 * `Fini`, if provided, will be called with the same parameters in the reactor thread once the thread has finished.
1440 * `Fini` will be called unconditionally, even if the fiber that launched the thread terminates before the thread
1441 * has finished.
1442 *
1443 * The `Fini` function runs within a `criticalSection`, and must not sleep.
1444 *
1445 * Returns:
1446 * The return argument from the delegate is returned by this function.
1447 *
1448 * Throws:
1449 * Will throw TimeoutExpired if the timeout expires.
1450 *
1451 * Will rethrow whatever the thread throws in the waiting fiber.
1452 */
1453 auto deferToThread(alias F, alias Fini = null)(Parameters!F args, Timeout timeout = Timeout.infinite) @nogc {
1454 DBG_ASSERT!"deferToThread called but thread deferral isn't enabled in the reactor"(
1455 optionsInEffect.threadDeferralEnabled);
1456 return threadPool.deferToThread!(F, Fini)(timeout, args);
1457 }
1458
1459 /// ditto
1460 auto deferToThread(F)(scope F dlg, Timeout timeout = Timeout.infinite) @nogc {
1461 DBG_ASSERT!"deferToThread called but thread deferral isn't enabled in the reactor"(optionsInEffect.threadDeferralEnabled);
1462 static auto glueFunction(F dlg) {
1463 return dlg();
1464 }
1465
1466 return threadPool.deferToThread!glueFunction(timeout, dlg);
1467 }
1468
1469 /**
1470 * forward an exception to another fiber
1471 *
1472 * This function throws an exception in another fiber. The fiber will be scheduled to run.
1473 *
1474 * There is a difference in semantics between the two forms. The first form throws an identical copy of the exception in the
1475 * target fiber. The second form forms a new exception to be thrown in that fiber.
1476 *
1477 * One place where this difference matters is with the stack trace the thrown exception will have. With the first form, the
1478 * stack trace will be the stack trace `ex` has, wherever it is (usually not in the fiber in which the exception was thrown).
1479 * With the second form, the stack trace will be of the target fiber, which means it will point back to where the fiber went
1480 * to sleep.
1481 */
1482 bool throwInFiber(FiberHandle fHandle, Throwable ex) nothrow @safe @nogc {
1483 if( !fHandle.isValid )
1484 return false;
1485
1486 ExcBuf* fiberEx = prepThrowInFiber(fHandle, false);
1487
1488 if( fiberEx is null )
1489 return false;
1490
1491 fiberEx.set(ex);
1492 auto fib = fHandle.get();
1493 resumeFiber(fib, true);
1494 return true;
1495 }
1496
1497 /// ditto
1498 bool throwInFiber(T : Throwable, string file = __FILE_FULL_PATH__, size_t line = __LINE__, A...)
1499 (FiberHandle fHandle, auto ref A args) nothrow @safe @nogc
1500 {
1501 pragma(inline, true);
1502 if( !fHandle.isValid )
1503 return false;
1504
1505 ExcBuf* fiberEx = prepThrowInFiber(fHandle, true);
1506
1507 if( fiberEx is null )
1508 return false;
1509
1510 fiberEx.construct!T(file, line, false, args);
1511 auto fib = fHandle.get();
1512 resumeFiber(fib, true);
1513 return true;
1514 }
1515
1516 /** Request that a GC collection take place ASAP
1517 * Params:
1518 * waitForCollection = whether to wait for the collection to finish before returning.
1519 */
1520 void requestGCCollection(bool waitForCollection = true) @safe @nogc {
1521 if( theReactor._stopping )
1522 return;
1523
1524 DEBUG!"Requesting explicit GC collection"();
1525 requestGCCollectionInternal(true);
1526
1527 if( waitForCollection ) {
1528 DBG_ASSERT!"Special fiber must request synchronous GC collection"( !isSpecialFiber );
1529 yield();
1530 }
1531 }
1532
1533 private void requestGCCollectionInternal(bool force) nothrow @safe @nogc {
1534 _gcCollectionNeeded = true;
1535 _gcCollectionForce = force;
1536 if( theReactor.currentFiberId != MainFiberId ) {
1537 theReactor.resumeSpecialFiber(theReactor.mainFiber);
1538 }
1539 }
1540
1541 /// Property for disabling/enabling the hang detector.
1542 ///
1543 /// The hang detector must be configured during `setup` by setting `OpenOptions.hangDetectorTimeout`.
1544 @property bool hangDetectorEnabled() pure const nothrow @safe @nogc {
1545 return _hangDetectorEnabled;
1546 }
1547
1548 /// ditto
1549 @property void hangDetectorEnabled(bool enabled) pure nothrow @safe @nogc {
1550 ASSERT!"Cannot enable an unconfigured hang detector"(
1551 !enabled || optionsInEffect.hangDetectorTimeout !is Duration.zero);
1552 _hangDetectorEnabled = enabled;
1553 }
1554
1555 /// Iterate all fibers
1556 auto iterateFibers() const nothrow @safe @nogc {
1557 static struct FibersIterator {
1558 private:
1559 uint numLivingFibers;
1560 FiberIdx idx = NUM_SPECIAL_FIBERS;
1561 ReturnType!(Reactor.criticalSection) criticalSection;
1562
1563 this(uint numFibers) {
1564 numLivingFibers = numFibers;
1565 this.criticalSection = theReactor.criticalSection();
1566
1567 if( numLivingFibers>0 ) {
1568 findNextFiber();
1569 }
1570 }
1571
1572 void findNextFiber() @safe @nogc nothrow {
1573 while( ! to!(ReactorFiber*)(idx).isAlive ) {
1574 idx++;
1575 DBG_ASSERT!"Asked for next living fiber but none was found"(
1576 idx<theReactor.optionsInEffect.numFibers);
1577 }
1578 }
1579 public:
1580 @property bool empty() const pure @safe @nogc nothrow {
1581 return numLivingFibers==0;
1582 }
1583
1584 void popFront() @safe @nogc nothrow {
1585 ASSERT!"Popping fiber from empty list"(!empty);
1586 numLivingFibers--;
1587 idx++;
1588
1589 if( !empty )
1590 findNextFiber;
1591 }
1592
1593 @property FiberHandle front() @safe @nogc nothrow {
1594 auto fib = &theReactor.allFibers[idx.value];
1595 DBG_ASSERT!"Scanned fiber %s is not alive"(fib.isAlive, idx);
1596
1597 return FiberHandle(fib);
1598 }
1599 }
1600
1601 return FibersIterator(cast(uint) reactorStats.numUsedFibers);
1602 }
1603
1604 auto iterateScheduledFibers(FiberPriorities priority) nothrow @safe @nogc {
1605 @notrace static struct Range {
1606 private typeof(scheduledFibersNormal.range()) fibersRange;
1607
1608 @property FiberHandle front() nothrow @nogc {
1609 return FiberHandle(fibersRange.front);
1610 }
1611
1612 @property bool empty() const pure nothrow @nogc {
1613 return fibersRange.empty;
1614 }
1615
1616 @notrace void popFront() nothrow {
1617 fibersRange.popFront();
1618 }
1619 }
1620
1621 with(FiberPriorities) final switch(priority) {
1622 case NORMAL:
1623 return scheduledFibersNormal.range();
1624 case HIGH:
1625 return scheduledFibersHigh.range();
1626 case IMMEDIATE:
1627 return scheduledFibersImmediate.range();
1628 }
1629
1630 assert(false, "Priority must be member of FiberPriorities");
1631 }
1632
1633 /**
1634 * Set a fiber name
1635 *
1636 * Used by certain diagnostic functions to distinguish the different fiber types and create histograms.
1637 * Arguments bear no specific meaning, but default to describing the function used to start the fiber.
1638 */
1639 @notrace void setFiberName(FiberHandle fh, string name, void *ptr) nothrow @safe @nogc {
1640 DBG_ASSERT!"Trying to set fiber name of an invalid fiber"(fh.isValid);
1641 setFiberName(fh.get, name, ptr);
1642 }
1643
1644 /// ditto
1645 @notrace void setFiberName(T)(FiberHandle fh, string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) {
1646 setFiberName(fh, name, dlg.ptr);
1647 }
1648
1649 /**
1650 * Temporarily change the fiber's name
1651 *
1652 * Meaning of arguments is as for `setFiberName`.
1653 *
1654 * returns:
1655 * A voldemort type whose destructor returns the fiber name to the one it had before. It also has a `release`
1656 * function for returning the name earlier.
1657 */
1658 auto pushFiberName(string name, void *ptr) nothrow @safe @nogc {
1659 static struct PrevName {
1660 private:
1661 string name;
1662 void* ptr;
1663
1664 public:
1665 @disable this();
1666 @disable this(this);
1667
1668 this(string name, void* ptr) nothrow @safe @nogc {
1669 this.name = name;
1670 this.ptr = ptr;
1671 }
1672
1673 ~this() nothrow @safe @nogc {
1674 if( name !is null || ptr !is null )
1675 release();
1676 }
1677
1678 void release() nothrow @safe @nogc {
1679 theReactor.setFiberName( theReactor.thisFiber, this.name, this.ptr );
1680
1681 name = null;
1682 ptr = null;
1683 }
1684 }
1685
1686 auto prevName = PrevName(name, ptr);
1687 setFiberName( theReactor.thisFiber, name, ptr );
1688
1689 import std.algorithm: move;
1690 return move(prevName);
1691 }
1692
1693 /// ditto
1694 @notrace auto pushFiberName(T)(string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) {
1695 return pushFiberName(name, dlg.ptr);
1696 }
1697
1698 /// Retrieve the fiber name set by `setFiberName`
1699 @notrace string getFiberName(FiberHandle fh) nothrow @safe @nogc {
1700 DBG_ASSERT!"Trying to get fiber name of an invalid fiber"(fh.isValid);
1701 return fh.get.params.fiberName;
1702 }
1703
1704 /// ditto
1705 @notrace string getFiberName() nothrow @safe @nogc {
1706 return thisFiber.params.fiberName;
1707 }
1708
1709 /// Retrieve the fiber pointer set by `setFiberName`
1710 @notrace void* getFiberPtr(FiberHandle fh) nothrow @safe @nogc {
1711 DBG_ASSERT!"Trying to get fiber pointer of an invalid fiber"(fh.isValid);
1712 return fh.get.params.fiberPtr;
1713 }
1714
1715 /// ditto
1716 @notrace void* getFiberPtr() nothrow @safe @nogc {
1717 return thisFiber.params.fiberPtr;
1718 }
1719
1720 /**
1721 * Wait until given fiber finishes
1722 */
1723 @notrace void joinFiber(FiberHandle fh, Timeout timeout = Timeout.infinite) @safe @nogc {
1724 ReactorFiber* fiber = fh.get();
1725
1726 if( fiber is null ) {
1727 assertMayContextSwitch();
1728 return;
1729 }
1730
1731 fiber.params.joinWaiters.wait(timeout);
1732 DBG_ASSERT!"Fiber handle %s is valid after signalling done"(!fh.isValid, fh);
1733 }
1734
1735 /// Report the current reactor statistics.
1736 @property Stats reactorStats() const nothrow @safe @nogc {
1737 Stats ret = stats;
1738
1739 foreach( FiberIdx.UnderlyingType i; 0..NUM_SPECIAL_FIBERS ) {
1740 auto fiberIdx = FiberIdx(i);
1741 auto fiber = to!(ReactorFiber*)(fiberIdx);
1742 DBG_ASSERT!"%s is in state %s with histogram of 0"(
1743 ret.fibersHistogram[fiber.state]>0, fiber.identity, fiber.state );
1744 ret.fibersHistogram[fiber.state]--;
1745 }
1746
1747 return ret;
1748 }
1749
1750 private:
1751 package @property inout(ReactorFiber)* thisFiber() inout nothrow pure @safe @nogc {
1752 DBG_ASSERT!"No current fiber as reactor was not started"(isRunning);
1753 return _thisFiber;
1754 }
1755
1756 @notrace TimedCallback* allocTimedCallback() nothrow @safe @nogc {
1757 DBG_ASSERT!"Registering timer on non-open reactor"(isOpen);
1758 auto ret = timedCallbacksPool.alloc();
1759 ret.generation = timedCallbackGeneration.getNext();
1760
1761 return ret;
1762 }
1763
1764 TimedCallback* allocRecurringTimer(Duration interval) nothrow @safe @nogc {
1765 TimedCallback* callback = allocTimedCallback();
1766 if( interval<optionsInEffect.timerGranularity )
1767 interval = optionsInEffect.timerGranularity;
1768
1769 callback.intervalCycles = TscTimePoint.toCycles(interval);
1770 return callback;
1771 }
1772
1773 TimedCallback* _registerRecurringTimer(Duration interval) nothrow @safe @nogc {
1774 TimedCallback* callback = allocRecurringTimer(interval);
1775 rescheduleRecurringTimer(callback);
1776 return callback;
1777 }
1778
1779 @property bool shouldRunTimedCallbacks() nothrow @safe @nogc {
1780 return timeQueue.cyclesTillNextEntry(TscTimePoint.hardNow()) == 0;
1781 }
1782
1783 void switchToNext() @safe @nogc {
1784 //DEBUG!"SWITCH out of %s"(thisFiber.identity);
1785 assertMayContextSwitch("Context switch");
1786
1787 stats.numContextSwitches++;
1788
1789 // in source fiber
1790 {
1791 auto now = TscTimePoint.hardNow;
1792 if( !thisFiber.flag!"SPECIAL" ) {
1793 auto fiberRunTime = now - fiberRunStartTime;
1794 if( fiberRunTime >= optionsInEffect.hoggerWarningThreshold ) {
1795 WARN!"#HOGGER detected: Fiber %s ran for %sms"(thisFiber.identity, fiberRunTime.total!"msecs");
1796 // TODO: Add dumping of stack trace
1797 }
1798 }
1799
1800 fiberRunStartTime = now;
1801
1802 if (thisFiber !is mainFiber && !mainFiber.flag!"SCHEDULED" && shouldRunTimedCallbacks()) {
1803 resumeSpecialFiber(mainFiber);
1804 }
1805
1806 ReactorFiber* nextFiber;
1807
1808 if( !scheduledFibersImmediate.empty ) {
1809 nextFiber = scheduledFibersImmediate.popHead();
1810 } else if(
1811 !scheduledFibersHigh.empty &&
1812 ( highPrioritySchedules<HIGH_PRIORITY_SCHEDULES_RATIO || scheduledFibersNormal.empty))
1813 {
1814 nextFiber = scheduledFibersHigh.popHead();
1815 highPrioritySchedules++;
1816 } else if( !scheduledFibersNormal.empty ) {
1817 nextFiber = scheduledFibersNormal.popHead();
1818 if( !scheduledFibersHigh.empty ) {
1819 ERROR!"Scheduled normal priority fiber %s over high priority %s to prevent starvation"(
1820 nextFiber.identity, scheduledFibersHigh.head.identity);
1821 }
1822
1823 highPrioritySchedules = 0;
1824 } else {
1825 DBG_ASSERT!"Idle fiber scheduled but all queues empty"( !idleFiber.flag!"SCHEDULED" );
1826 nextFiber = idleFiber;
1827 idleFiber.flag!"SCHEDULED" = true;
1828 nothingScheduled = true;
1829 }
1830
1831 if( thisFiber.state==FiberState.Running )
1832 thisFiber.state = FiberState.Sleeping;
1833 else {
1834 assertEQ( thisFiber.state, FiberState.Done, "Fiber is in incorrect state" );
1835 thisFiber.state = FiberState.None;
1836 }
1837
1838 DBG_ASSERT!"Couldn't decide on a fiber to schedule"(nextFiber !is null);
1839
1840 ASSERT!"Next fiber %s is not marked scheduled" (nextFiber.flag!"SCHEDULED", nextFiber.identity);
1841 nextFiber.flag!"SCHEDULED" = false;
1842 DBG_ASSERT!"%s is in state %s, should be Sleeping or Starting"(
1843 nextFiber.state==FiberState.Sleeping || nextFiber.state==FiberState.Starting,
1844 nextFiber.identity, nextFiber.state);
1845 nextFiber.state = FiberState.Running;
1846
1847 // DEBUG!"Switching %s => %s"(thisFiber.identity, nextFiber.identity);
1848 if (thisFiber !is nextFiber) {
1849 // make the switch
1850 switchTo(nextFiber);
1851 }
1852 }
1853 }
1854
1855 void switchTo(ReactorFiber* nextFiber) @safe @nogc {
1856 auto currentFiber = thisFiber;
1857 _thisFiber = nextFiber;
1858
1859 currentFiber.switchTo(nextFiber);
1860
1861 // After returning
1862 /+
1863 Important note:
1864 Any code you place here *must* be replicated at the beginning of ReactorFiber.wrapper. Fibers launched
1865 for the first time do not return from `switchTo` above.
1866 +/
1867
1868 // DEBUG!"SWITCH into %s"(thisFiber.identity);
1869
1870 // This might throw, so it needs to be the last thing we do
1871 thisFiber.switchInto();
1872 }
1873
1874 bool fiberTerminated() nothrow @notrace {
1875 ASSERT!"special fibers must never terminate" (!thisFiber.flag!"SPECIAL");
1876
1877 freeFibers.prepend(thisFiber);
1878
1879 bool skipBody = false;
1880 try {
1881 // Wait for next incarnation of fiber
1882 switchToNext();
1883 } catch (FiberInterrupt ex) {
1884 INFO!"Fiber %s killed by FiberInterrupt exception %s"(currentFiberId, ex.msg);
1885 skipBody = true;
1886 } catch (Throwable ex) {
1887 ERROR!"switchToNext on %s fiber %s failed with exception %s"(
1888 thisFiber.state==FiberState.Running ? "just starting" : "dead", currentFiberId, ex.msg);
1889 theReactor.forwardExceptionToMain(ex);
1890 assert(false);
1891 }
1892
1893 return skipBody;
1894 }
1895
1896 void resumeSpecialFiber(ReactorFiber* fib) nothrow @safe @nogc {
1897 DBG_ASSERT!"Asked to resume special fiber %s which isn't marked special" (fib.flag!"SPECIAL", fib.identity);
1898 DBG_ASSERT!"Asked to resume special fiber %s with no body set" (fib.flag!"CALLBACK_SET", fib.identity);
1899 DBG_ASSERT!"Special fiber %s scheduled not in head of list" (
1900 !fib.flag!"SCHEDULED" || scheduledFibersImmediate.head is fib, fib.identity);
1901
1902 if (!fib.flag!"SCHEDULED") {
1903 fib.flag!"SCHEDULED" = true;
1904 scheduledFibersImmediate.prepend(fib);
1905 nothingScheduled = false;
1906 }
1907 }
1908
1909 void resumeFiber(ReactorFiber* fib, bool immediate = false) nothrow @safe @nogc {
1910 DBG_ASSERT!"Cannot resume a special fiber %s using the standard resumeFiber" (!fib.flag!"SPECIAL", fib.identity);
1911 ASSERT!"resumeFiber called on %s, which does not have a callback set"(fib.flag!"CALLBACK_SET", fib.identity);
1912
1913 typeof(scheduledFibersNormal)* queue;
1914 if( immediate ) {
1915 if( fib.flag!"SCHEDULED" && fib !in scheduledFibersImmediate ) {
1916 fib._owner.remove(fib);
1917 fib.flag!"SCHEDULED" = false;
1918 }
1919
1920 queue = &scheduledFibersImmediate;
1921 } else if( fib.flag!"PRIORITY" ) {
1922 queue = &scheduledFibersHigh;
1923 } else {
1924 queue = &scheduledFibersNormal;
1925 }
1926
1927 if (!fib.flag!"SCHEDULED") {
1928 if (fib._owner !is null) {
1929 // Whatever this fiber was waiting to do, it is no longer what it needs to be doing
1930 fib._owner.remove(fib);
1931 }
1932 fib.flag!"SCHEDULED" = true;
1933 queue.append(fib);
1934 nothingScheduled = false;
1935 }
1936 }
1937
1938 ReactorFiber* _spawnFiber(bool immediate) nothrow @safe @nogc {
1939 ASSERT!"No more free fibers in pool"(!freeFibers.empty);
1940 auto fib = freeFibers.popHead();
1941 assert (!fib.flag!"CALLBACK_SET");
1942 fib.flag!"CALLBACK_SET" = true;
1943 fib.state = FiberState.Starting;
1944 fib._prevId = FiberIdx.invalid;
1945 fib._nextId = FiberIdx.invalid;
1946 fib._owner = null;
1947 fib.params.flsBlock.reset();
1948 resumeFiber(fib, immediate);
1949 return fib;
1950 }
1951
1952 void idleLoop() {
1953 while (true) {
1954 TscTimePoint start, end;
1955 end = start = TscTimePoint.hardNow;
1956
1957 while (nothingScheduled) {
1958 auto critSect = criticalSection();
1959
1960 /*
1961 Since we've updated "end" before calling the timers, these timers won't count as idle time, unless....
1962 after running them the scheduledFibers list is still empty, in which case they do.
1963 */
1964 if( runTimedCallbacks(end) )
1965 continue;
1966
1967 // We only reach here if runTimedCallbacks did nothing, in which case "end" is recent enough
1968 Duration sleepDuration = timeQueue.timeTillNextEntry(end);
1969 bool countsAsIdle = true;
1970 if( actualIdleCallbacks.length==1 ) {
1971 countsAsIdle = actualIdleCallbacks[0](sleepDuration) && countsAsIdle;
1972 } else if ( actualIdleCallbacks.length>1 ) {
1973 foreach(cb; actualIdleCallbacks) {
1974 with( pushFiberName("Idle callback", cb) ) {
1975 countsAsIdle = cb(ZERO_DURATION) && countsAsIdle;
1976 }
1977 }
1978 } else {
1979 //DEBUG!"Idle fiber called with no callbacks, sleeping %sus"(sleepDuration.total!"usecs");
1980 import core.thread; Thread.sleep(sleepDuration);
1981 }
1982
1983 if( countsAsIdle ) {
1984 end = TscTimePoint.hardNow;
1985 } else {
1986 if( nothingScheduled ) {
1987 // We are going in for another round, but this round should not count as idle time
1988 stats.idleCycles += end.diff!"cycles"(start);
1989 end = start = TscTimePoint.hardNow;
1990 }
1991 }
1992 }
1993 stats.idleCycles += end.diff!"cycles"(start);
1994 switchToNext();
1995 }
1996 }
1997
1998 @notrace bool runTimedCallbacks(TscTimePoint now = TscTimePoint.hardNow) {
1999 // Timer callbacks are not allowed to sleep
2000 auto criticalSectionContainer = criticalSection();
2001
2002 bool ret;
2003
2004 TimedCallback* callback;
2005 while ((callback = timeQueue.pop(now)) !is null) {
2006 callback.closure();
2007 if( callback.intervalCycles==0 )
2008 timedCallbacksPool.release(callback);
2009 else
2010 rescheduleRecurringTimer(callback);
2011
2012 ret = true;
2013 }
2014
2015 return ret;
2016 }
2017
2018 void rescheduleRecurringTimer(TimedCallback* callback) nothrow @safe @nogc {
2019 ulong cycles = TscTimePoint.hardNow.cycles + callback.intervalCycles;
2020 cycles -= cycles % callback.intervalCycles;
2021 callback.timePoint = TscTimePoint(cycles);
2022
2023 timeQueue.insert(callback);
2024 }
2025
2026 ExcBuf* prepThrowInFiber(FiberHandle fHandle, bool updateBT, bool specialOkay = false) nothrow @safe @nogc {
2027 ReactorFiber* fib = fHandle.get();
2028 ASSERT!"Cannot throw in the reactor's own fibers"( !fib.flag!"SPECIAL" || specialOkay );
2029 if( fib is null ) {
2030 WARN!"Failed to throw exception in fiber %s which is no longer valid"(fHandle);
2031 return null;
2032 }
2033
2034 if( fib.flag!"HAS_EXCEPTION" ) {
2035 ERROR!"Tried to throw exception in fiber %s which already has an exception pending"(fHandle);
2036 return null;
2037 }
2038
2039 fib.flag!"HAS_EXCEPTION" = true;
2040 fib.flag!"EXCEPTION_BT" = updateBT;
2041 return &fib.params.currExcBuf;
2042 }
2043
2044 void forwardExceptionToMain(Throwable ex) nothrow @trusted @nogc {
2045 ExcBuf* fiberEx = prepThrowInFiber(FiberHandle(mainFiber), false, true);
2046
2047 if( fiberEx is null )
2048 return;
2049
2050 fiberEx.set(ex);
2051 if( !mainFiber.flag!"SPECIAL" ) {
2052 ASSERT!"Main fiber not marked as special but reactor is not stopping"( _stopping );
2053 mainFiber.flag!"SPECIAL" = true;
2054 }
2055 resumeSpecialFiber(mainFiber);
2056 as!"nothrow"(&theReactor.switchToNext);
2057 assert(false, "switchToNext on dead system returned");
2058 }
2059
2060 void registerHangDetector() @trusted @nogc {
2061 DBG_ASSERT!"registerHangDetector called twice"(!hangDetectorTimer.isSet);
2062 hangDetectorTimer.start();
2063 }
2064
2065 void deregisterHangDetector() nothrow @trusted @nogc {
2066 hangDetectorTimer.cancel();
2067 }
2068
2069 extern(C) static void hangDetectorHandler() nothrow @trusted @nogc {
2070 if( !theReactor._hangDetectorEnabled )
2071 return;
2072
2073 auto now = TscTimePoint.hardNow();
2074 auto delay = now - theReactor.fiberRunStartTime;
2075
2076 if( delay<theReactor.optionsInEffect.hangDetectorTimeout || theReactor.currentFiberId == IdleFiberId )
2077 return;
2078
2079 long seconds, usecs;
2080 delay.split!("seconds", "usecs")(seconds, usecs);
2081
2082 ERROR!"Hang detector triggered for %s after %s.%06s seconds"(theReactor.currentFiberId, seconds, usecs);
2083 dumpStackTrace();
2084
2085 ABORT("Hang detector killed process");
2086 }
2087
2088 void registerFaultHandlers() @trusted @nogc {
2089 posix_signal.sigaction_t action;
2090 action.sa_sigaction = &faultHandler;
2091 action.sa_flags = posix_signal.SA_SIGINFO | posix_signal.SA_RESETHAND | posix_signal.SA_ONSTACK;
2092
2093 errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGSEGV, &action, null)==0, "Failed to register SIGSEGV handler" );
2094 errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGILL, &action, null)==0, "Failed to register SIGILL handler" );
2095 errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGBUS, &action, null)==0, "Failed to register SIGBUS handler" );
2096 }
2097
2098 void deregisterFaultHandlers() nothrow @trusted @nogc {
2099 posix_signal.signal(OSSignal.SIGBUS, posix_signal.SIG_DFL);
2100 posix_signal.signal(OSSignal.SIGILL, posix_signal.SIG_DFL);
2101 posix_signal.signal(OSSignal.SIGSEGV, posix_signal.SIG_DFL);
2102 }
2103
2104 @notrace extern(C) static void faultHandler(int signum, siginfo_t* info, void* ctx) nothrow @trusted @nogc {
2105 OSSignal sig = cast(OSSignal)signum;
2106 string faultName;
2107
2108 switch(sig) {
2109 case OSSignal.SIGSEGV:
2110 faultName = "Segmentation fault";
2111 break;
2112 case OSSignal.SIGILL:
2113 faultName = "Illegal instruction";
2114 break;
2115 case OSSignal.SIGBUS:
2116 faultName = "Bus error";
2117 break;
2118 default:
2119 faultName = "Unknown fault";
2120 break;
2121 }
2122
2123 META!"#OSSIGNAL %s"(faultName);
2124 dumpStackTrace();
2125 flushLog(); // There is a certain chance the following lines themselves fault. Flush the logs now so that we have something
2126
2127 Ucontext* contextPtr = cast(Ucontext*)ctx;
2128 auto pc = contextPtr ? contextPtr.uc_mcontext.registers.rip : 0;
2129
2130 if( isReactorThread ) {
2131 auto currentSD = theReactor.getCurrentFiberPtr(true).stackDescriptor;
2132 ERROR!"%s on %s address 0x%x, PC 0x%x stack params at 0x%x"(
2133 faultName, theReactor.currentFiberId, info.si_addr, pc, theReactor.getCurrentFiberPtr(true).params);
2134 ERROR!"Stack is at [%s .. %s]"( currentSD.bstack, currentSD.tstack );
2135 auto guardAddrStart = currentSD.bstack - GUARD_ZONE_SIZE;
2136 if( info.si_addr < currentSD.bstack && info.si_addr >= guardAddrStart ) {
2137 ERROR!"Hit stack guard area"();
2138 }
2139 } else {
2140 ERROR!"%s on OS thread at address %s, PC %s"(faultName, info.si_addr, pc);
2141 }
2142 flushLog();
2143
2144 // Exit the fault handler, which will re-execute the offending instruction. Since we registered as run once, the default handler
2145 // will then kill the node.
2146 }
2147
2148 void mainloop() {
2149 assert (isOpen);
2150 assert (!isRunning);
2151 assert (_thisFiber is null);
2152
2153 _running = true;
2154 scope(exit) _running = false;
2155
2156 GC.disable();
2157 scope(exit) GC.enable();
2158
2159 lastGCStats = GC.stats();
2160
2161 if( optionsInEffect.utGcDisabled ) {
2162 // GC is disabled during the reactor run. Run it before we start
2163 GC.collect();
2164 }
2165
2166 // Don't register the hang detector until after we've finished running the GC
2167 if( optionsInEffect.hangDetectorTimeout !is Duration.zero ) {
2168 registerHangDetector();
2169 _hangDetectorEnabled = true;
2170 } else {
2171 _hangDetectorEnabled = false;
2172 }
2173
2174 scope(exit) {
2175 if( optionsInEffect.hangDetectorTimeout !is Duration.zero )
2176 deregisterHangDetector();
2177
2178 _hangDetectorEnabled = false;
2179 }
2180
2181 _thisFiber = mainFiber;
2182 scope(exit) _thisFiber = null;
2183
2184 if( !optionsInEffect.utGcDisabled )
2185 TimerHandle gcTimer = registerRecurringTimer!requestGCCollectionInternal(optionsInEffect.gcInterval, false);
2186
2187 try {
2188 while (!_stopping) {
2189 DBG_ASSERT!"Switched to mainloop with wrong thisFiber %s"(thisFiber is mainFiber, thisFiber.identity);
2190 runTimedCallbacks();
2191 if( _gcCollectionNeeded )
2192 gcCollect();
2193
2194 if( !_stopping )
2195 switchToNext();
2196 }
2197 } catch( ReactorExit ex ) {
2198 ASSERT!"Main loop threw ReactorExit, but reactor is not stopping"(_stopping);
2199 }
2200
2201 performStopReactor();
2202 }
2203
2204 void gcCollect() {
2205 _gcCollectionNeeded = false;
2206 auto statsBefore = GC.stats();
2207
2208 if(
2209 _gcCollectionForce ||
2210 optionsInEffect.gcRunThreshold==0 ||
2211 statsBefore.usedSize > lastGCStats.usedSize+optionsInEffect.gcRunThreshold )
2212 {
2213 TscTimePoint.hardNow(); // Update the hard now value
2214 DEBUG!"#GC collection cycle started, %s bytes allocated since last run (forced %s)"(statsBefore.usedSize - lastGCStats.usedSize, _gcCollectionForce);
2215
2216 GC.collect();
2217 TscTimePoint.hardNow(); // Update the hard now value
2218 lastGCStats = GC.stats();
2219 DEBUG!"#GC collection cycle ended, freed %s bytes"(statsBefore.usedSize - lastGCStats.usedSize);
2220
2221 _gcCollectionForce = false;
2222 }
2223 }
2224
2225 void performStopReactor() @nogc {
2226 ASSERT!"performStopReactor must be called from the main fiber. Use Reactor.stop instead"( isMain );
2227
2228 Throwable reactorExit = mkEx!ReactorExit("Reactor is quitting");
2229 foreach(ref fiber; allFibers[1..$]) { // All fibers but main
2230 if( fiber.isAlive ) {
2231 fiber.flag!"SPECIAL" = false;
2232 throwInFiber(FiberHandle(&fiber), reactorExit);
2233 }
2234 }
2235
2236 thisFiber.flag!"SPECIAL" = false;
2237 yield();
2238
2239 META!"Stopping reactor"();
2240 _stopping = false;
2241 }
2242
2243 @notrace void setFiberName(ReactorFiber* fib, string name, void *ptr) nothrow @safe @nogc {
2244 fib.params.fiberName = name;
2245 fib.params.fiberPtr = ptr;
2246 }
2247
2248 @notrace void setFiberName(T)(ReactorFiber* fib, string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) {
2249 fib.params.fiberName = name;
2250 fib.params.fiberPtr = dlg.ptr;
2251 }
2252
2253 void performCrossFiberHook() @safe @nogc {
2254 ReactorFiber* callingFiber;
2255
2256 // Perform the hook under critical section
2257 with(criticalSection()) {
2258 scope(failure) ASSERT!"Cross fiber hook threw"(false);
2259 crossFiberHook();
2260
2261 callingFiber = crossFiberHookCaller.get();
2262 ASSERT!"Fiber invoking hook %s invalidated by hook"(callingFiber !is null, crossFiberHookCaller.identity);
2263
2264 // Clear the hooks so they don't get called when returning to the hooker fiber
2265 crossFiberHook = null;
2266 crossFiberHookCaller.reset();
2267 }
2268
2269 switchTo(callingFiber);
2270 }
2271
2272 @notrace void callInFiber(FiberHandle fh, scope void delegate() nothrow @safe @nogc callback) @trusted @nogc {
2273 ASSERT!"Cannot set hook when one is already set"( crossFiberHook is null && !crossFiberHookCaller.isSet );
2274 ReactorFiber* fib = fh.get();
2275 if( fib is null )
2276 // Fiber isn't valid - don't do anything
2277 return;
2278
2279 if( fib is thisFiber ) {
2280 // We were asked to switch to ourselves. Just run the callback
2281 auto critSect = criticalSection();
2282 callback();
2283
2284 return;
2285 }
2286
2287 with(FiberState) {
2288 ASSERT!"Trying to dump stack trace of %s which is in invalid state %s"(
2289 fib.state==Starting || fib.state==Sleeping, fib.identity, fib.state );
2290 }
2291
2292 // We are storing a scoped delegate inside a long living pointer, but we make sure to finish using it before exiting.
2293 crossFiberHook = callback;
2294 crossFiberHookCaller = FiberHandle(thisFiber); // Don't call currentFiber, as we might be a special fiber
2295
2296 switchTo(fib);
2297
2298 DBG_ASSERT!"crossFiberHookCaller not cleared after call"( !crossFiberHookCaller.isSet );
2299 DBG_ASSERT!"crossFiberHook not cleared after call"( crossFiberHook is null );
2300 }
2301
2302 import std..string : format;
2303 enum string decl_log_as(string logLevel) = q{
2304 @notrace public void %1$s_AS(
2305 string fmt, string file = __FILE_FULL_PATH__, string mod = __MODULE__, int line = __LINE__, T...)
2306 (FiberHandle fh, T args) nothrow @safe @nogc
2307 {
2308 auto fiber = fh.get;
2309 if( fiber is null ) {
2310 ERROR!("Can't issue %1$s log as %%s. Original log: "~fmt, file, mod, line)(fh, args);
2311 return;
2312 }
2313
2314 auto currentFiber = getCurrentFiberPtr(true);
2315 fiber.logSwitchInto();
2316 scope(exit) currentFiber.logSwitchInto();
2317
2318 %1$s!(fmt, file, mod, line)(args);
2319 }
2320 }.format(logLevel);
2321 mixin(decl_log_as!"DEBUG");
2322 mixin(decl_log_as!"INFO");
2323 mixin(decl_log_as!"WARN");
2324 mixin(decl_log_as!"ERROR");
2325 mixin(decl_log_as!"META");
2326
2327 /**
2328 * Log the stack trace of a given fiber.
2329 *
2330 * The fiber should be in a valid state. This is the equivalent to the fiber itself running `dumpStackTrace`.
2331 */
2332 @notrace public void LOG_TRACEBACK_AS(
2333 FiberHandle fh, string text, string file = __FILE_FULL_PATH__, size_t line = __LINE__) @safe @nogc
2334 {
2335 callInFiber(fh, {
2336 dumpStackTrace(text, file, line);
2337 });
2338 }
2339 }
2340
2341 // Expose the conversion to/from ReactorFiber only to the reactor package
2342 package ReactorFiber* to(T : ReactorFiber*)(FiberIdx fidx) nothrow @safe @nogc {
2343 if (!fidx.isValid)
2344 return null;
2345
2346 ASSERT!"Reactor is not open"( theReactor.isOpen );
2347 return &theReactor.allFibers[fidx.value];
2348 }
2349
2350 package FiberIdx to(T : FiberIdx)(const ReactorFiber* rfp) nothrow @safe @nogc {
2351 if (rfp is null)
2352 return FiberIdx.invalid;
2353
2354 ASSERT!"Reactor is not open"( theReactor.isOpen );
2355 auto idx = rfp - &theReactor.allFibers.arr[0];
2356 DBG_ASSERT!"Reactor fiber pointer not pointing to fibers pool: base %s ptr %s idx %s"(
2357 idx>=0 && idx<theReactor.allFibers.arr.length, &theReactor.allFibers.arr[0], rfp, idx);
2358 return FiberIdx( cast(ushort)idx );
2359 }
2360
2361 package FiberIdx to(T : FiberIdx)(FiberId fiberId) nothrow @safe @nogc {
2362 return FiberIdx( fiberId.value & theReactor.maxNumFibersMask );
2363 }
2364
2365 private __gshared Reactor _theReactor;
2366 private bool /* thread local */ _isReactorThread;
2367
2368 /**
2369 * return a reference to the Reactor singleton
2370 *
2371 * In theory, @safe code must not access global variables. Since theReactor is only meant to be used by a single thread, however, this
2372 * function is @trusted. If it were not, practically no code could be @safe.
2373 */
2374 @property ref Reactor theReactor() nothrow @trusted @nogc {
2375 //DBG_ASSERT!"not main thread"(_isReactorThread);
2376 return _theReactor;
2377 }
2378
2379 /// Returns whether the current thread is the thread in which theReactor is running
2380 @property bool isReactorThread() nothrow @safe @nogc {
2381 return _isReactorThread;
2382 }
2383
2384 version (unittest) {
2385 /**
2386 * Run a test inside a reactor
2387 *
2388 * This is a convenience function for running a UT as a reactor fiber. A new reactor will be initialized, `dg` called
2389 * and the reactor will automatically stop when `dg` is done.
2390 */
2391 int testWithReactor(int delegate() dg, Reactor.OpenOptions options = Reactor.OpenOptions.init) {
2392 sigset_t emptyMask;
2393 errnoEnforceNGC( sigprocmask( SIG_SETMASK, &emptyMask, null )==0, "sigprocmask failed" );
2394
2395 theReactor.setup(options);
2396 scope(success) theReactor.teardown();
2397
2398 bool delegateReturned = false;
2399
2400 void wrapper() {
2401 int ret;
2402 try {
2403 ret = dg();
2404
2405 delegateReturned = true;
2406 } catch(ReactorExit ex) {
2407 LOG_EXCEPTION(ex);
2408 assert(false, "testWithReactor's body called theReactor.stop explicitly");
2409 } catch(FiberInterrupt ex) {
2410 LOG_EXCEPTION(ex);
2411 theReactor.stop();
2412 } catch(Throwable ex) {
2413 // No need to stop the reactor - the exception thrown will teminate it
2414 LOG_EXCEPTION(ex);
2415 ERROR!"Test terminated abnormally"();
2416 throw ex;
2417 }
2418
2419 theReactor.stop( ret );
2420 }
2421
2422 theReactor.spawnFiber(&wrapper);
2423 int ret = theReactor.start();
2424 assert (delegateReturned, "testWithReactor called with a delegate that threw without returning");
2425
2426 return ret;
2427 }
2428
2429 /// ditto
2430 void testWithReactor(void delegate() dg, Reactor.OpenOptions options = Reactor.OpenOptions.init) {
2431 int wrapper() {
2432 dg();
2433
2434 return 0;
2435 }
2436
2437 testWithReactor(&wrapper, options);
2438 }
2439
2440 public import mecca.runtime.ut: mecca_ut;
2441
2442 mixin template TEST_FIXTURE_REACTOR(FIXTURE) {
2443 import mecca.runtime.ut: runFixtureTestCases;
2444 import mecca.reactor: testWithReactor, Reactor;
2445 unittest {
2446 Reactor.OpenOptions options;
2447
2448 static if( __traits(hasMember, FIXTURE, "reactorOptions") ) {
2449 options = FIXTURE.reactorOptions;
2450 }
2451
2452 testWithReactor({
2453 try {
2454 runFixtureTestCases!(FIXTURE)();
2455 } catch( Throwable ex ) {
2456 import mecca.log: LOG_EXCEPTION;
2457 import mecca.lib.exception: DIE;
2458
2459 LOG_EXCEPTION(ex);
2460 DIE("UT failed due to exception");
2461 }
2462 }, options);
2463 }
2464 }
2465 }
2466
2467
2468 unittest {
2469 import std.stdio;
2470
2471 theReactor.setup();
2472 scope(exit) theReactor.teardown();
2473
2474 static void fibFunc(string name) {
2475 foreach(i; 0 .. 10) {
2476 writeln(name);
2477 theReactor.yield();
2478 }
2479 theReactor.stop();
2480 }
2481
2482 theReactor.spawnFiber(&fibFunc, "hello");
2483 theReactor.spawnFiber(&fibFunc, "world");
2484 theReactor.start();
2485 }
2486
2487 unittest {
2488 // Test simple timeout
2489 import std.stdio;
2490
2491 theReactor.setup();
2492 scope(exit) theReactor.teardown();
2493
2494 uint counter;
2495 TscTimePoint start;
2496
2497 void fiberFunc(Duration duration) {
2498 INFO!"Fiber %s sleeping for %s"(theReactor.currentFiberHandle, duration.toString);
2499 theReactor.sleep(duration);
2500 auto now = TscTimePoint.hardNow;
2501 counter++;
2502 INFO!"Fiber %s woke up after %s, overshooting by %s counter is %s"(theReactor.currentFiberHandle, (now - start).toString,
2503 ((now-start) - duration).toString, counter);
2504 }
2505
2506 void ender() {
2507 INFO!"Fiber %s ender is sleeping for 250ms"(theReactor.currentFiberHandle);
2508 theReactor.sleep(dur!"msecs"(250));
2509 INFO!"Fiber %s ender woke up"(theReactor.currentFiberHandle);
2510
2511 theReactor.stop();
2512 }
2513
2514 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(10));
2515 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(100));
2516 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(150));
2517 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(20));
2518 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(30));
2519 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(200));
2520 theReactor.spawnFiber(&ender);
2521
2522 start = TscTimePoint.hardNow;
2523 theReactor.start();
2524 auto end = TscTimePoint.hardNow;
2525 INFO!"UT finished in %s"((end - start).toString);
2526
2527 assert(counter == 6, "Not all fibers finished");
2528 }
2529
2530 unittest {
2531 // Test suspending timeout
2532 import std.stdio;
2533
2534 theReactor.setup();
2535 scope(exit) theReactor.teardown();
2536
2537 void fiberFunc() {
2538 bool thrown;
2539
2540 try {
2541 theReactor.suspendCurrentFiber( Timeout(dur!"msecs"(4)) );
2542 } catch(TimeoutExpired ex) {
2543 thrown = true;
2544 }
2545
2546 assert(thrown);
2547
2548 theReactor.stop();
2549 }
2550
2551 theReactor.spawnFiber(&fiberFunc);
2552 theReactor.start();
2553 }
2554
2555 unittest {
2556 // Test suspending timeout
2557 import std.stdio;
2558
2559 // GC running during the test mess with the timing
2560 Reactor.OpenOptions oo;
2561 oo.utGcDisabled = true;
2562
2563 theReactor.setup(oo);
2564 scope(exit) theReactor.teardown();
2565
2566 void fiberFunc() {
2567 TimerHandle[8] handles;
2568 Duration[8] timeouts = [
2569 dur!"msecs"(2),
2570 dur!"msecs"(200),
2571 dur!"msecs"(6),
2572 dur!"msecs"(120),
2573 dur!"msecs"(37),
2574 dur!"msecs"(40),
2575 dur!"msecs"(133),
2576 dur!"msecs"(8),
2577 ];
2578
2579 ubyte a;
2580
2581 static void timer(ubyte* a, TimerHandle* handle, ubyte bit) {
2582 (*a) |= 1<<bit;
2583
2584 (*handle) = TimerHandle.init;
2585 }
2586
2587 foreach(ubyte i, duration; timeouts) {
2588 handles[i] = theReactor.registerTimer!timer( Timeout(duration), &a, &handles[i], i );
2589 }
2590
2591 uint recurringCounter;
2592 static void recurringTimer(uint* counter) {
2593 (*counter)++;
2594 }
2595
2596 TimerHandle recurringTimerHandle = theReactor.registerRecurringTimer!recurringTimer( dur!"msecs"(7), &recurringCounter );
2597
2598 theReactor.sleep(dur!"msecs"(3));
2599
2600 // Cancel one expired timeout and one yet to happen
2601 handles[0].cancelTimer();
2602 handles[6].cancelTimer();
2603
2604 // Wait for all timers to run
2605 theReactor.sleep(dur!"msecs"(200));
2606
2607 assert(a == 0b1011_1111);
2608 ASSERT!"Recurring timer should run 29 times, ran %s"(recurringCounter>=25 && recurringCounter<=30, recurringCounter); // 203ms / 7
2609
2610 theReactor.stop();
2611 }
2612
2613 theReactor.spawnFiber(&fiberFunc);
2614 theReactor.start();
2615 }
2616
2617 unittest {
2618 import mecca.reactor.sync.event;
2619
2620 // Linux has a fiber running for the signal handler, Darwin does not.
2621 version (Darwin)
2622 enum startupFiberCount = 0;
2623 else version (linux)
2624 enum startupFiberCount = 1;
2625
2626 theReactor.setup();
2627 scope(exit) theReactor.teardown();
2628
2629 Event evt1, evt2;
2630
2631 class TheException : Exception {
2632 this() {
2633 super("The Exception");
2634 }
2635 }
2636
2637 void fib2() {
2638 // Release 1
2639 evt1.set();
2640
2641 try {
2642 // Wait for 1 to do its stuff
2643 evt2.wait();
2644
2645 assert( false, "Exception not thrown" );
2646 } catch( Exception ex ) {
2647 assert( ex.msg == "The Exception" );
2648 }
2649
2650 theReactor.stop();
2651 }
2652
2653 void fib1() {
2654 assertEQ( theReactor.getFiberState(theReactor.currentFiberHandle), FiberState.Running );
2655 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 0 );
2656 auto fib = theReactor.spawnFiber(&fib2);
2657 assertEQ( theReactor.getFiberState(fib), FiberState.Starting );
2658 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 1 );
2659
2660 evt1.wait();
2661
2662 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 0 );
2663
2664 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Sleeping], startupFiberCount + 1 );
2665 theReactor.throwInFiber(fib, new TheException);
2666 // The following should be "1", because the state would switch to Scheduled. Since that's not implemented yet...
2667 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Sleeping], startupFiberCount + 1 ); // Should be 1
2668 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Scheduled], 0 ); // Should 1
2669 evt2.set();
2670 }
2671
2672 theReactor.spawnFiber(&fib1);
2673 theReactor.start();
2674 }
2675
2676 unittest {
2677 Reactor.OpenOptions options;
2678 options.hangDetectorTimeout = 20.msecs;
2679 options.utGcDisabled = true;
2680 DEBUG!"sanity: %s"(options.hangDetectorTimeout.toString);
2681
2682 testWithReactor({
2683 theReactor.sleep(200.msecs);
2684 /+
2685 // To trigger the hang, uncomment this:
2686 import core.thread;
2687 Thread.sleep(200.msecs);
2688 +/
2689 }, options);
2690 }
2691
2692 /+
2693 unittest {
2694 // Trigger a segmentation fault
2695 theReactor.options.hangDetectorTimeout = 20.msecs;
2696 DEBUG!"sanity: %s"(theReactor.options.hangDetectorTimeout);
2697
2698 testWithReactor({
2699 int* a = cast(int*) 16;
2700 *a = 3;
2701 });
2702 }
2703 +/
2704
2705 unittest {
2706 // No automatic failing, but at least exercise the *_AS loggers
2707 import mecca.reactor.sync.event;
2708 Event finish;
2709
2710 META!"#UT exercise log_AS functions"();
2711
2712 void fiberFunc() {
2713 DEBUG!"Fiber started"();
2714 finish.wait();
2715 DEBUG!"Fiber finished"();
2716 }
2717
2718 void testBody() {
2719 auto fh = theReactor.spawnFiber(&fiberFunc);
2720 theReactor.yield();
2721
2722 DEBUG!"Main fiber logging as %s"(fh);
2723 theReactor.DEBUG_AS!"DEBUG trace with argument %s"(fh, 17);
2724 theReactor.INFO_AS!"INFO trace"(fh);
2725 theReactor.WARN_AS!"WARN trace"(fh);
2726 theReactor.ERROR_AS!"ERROR trace"(fh);
2727 theReactor.META_AS!"META trace"(fh);
2728
2729 DEBUG!"Killing fiber"();
2730 finish.set();
2731 theReactor.yield();
2732
2733 DEBUG!"Trying to log as dead fiber"();
2734 theReactor.DEBUG_AS!"DEBUG trace on dead fiber with argument %s"(fh, 18);
2735 }
2736
2737 testWithReactor(&testBody);
2738 }
2739
2740 unittest {
2741 // Make sure we do not immediately repeat the same FiberId on relaunch
2742 FiberId fib;
2743
2744 void test1() {
2745 fib = theReactor.currentFiberId();
2746 }
2747
2748 void test2() {
2749 assert(theReactor.currentFiberId() != fib);
2750 }
2751
2752 testWithReactor({
2753 theReactor.spawnFiber(&test1);
2754 theReactor.yield();
2755 theReactor.spawnFiber(&test2);
2756 theReactor.yield();
2757 });
2758 }
2759
2760 unittest {
2761 // Make sure that FiberHandle can return a ReactorFiber*
2762 static void fiberBody() {
2763 assert( theReactor.currentFiberPtr == theReactor.currentFiberHandle.get() );
2764 }
2765
2766 testWithReactor({
2767 theReactor.spawnFiber!fiberBody(); // Run twice to make sure the genration isn't 0
2768 theReactor.yield();
2769 theReactor.spawnFiber!fiberBody();
2770 theReactor.yield();
2771 });
2772 }
2773
2774 unittest {
2775 int ret = testWithReactor( { return 17; } );
2776
2777 assert( ret==17 );
2778 }
2779
2780 unittest {
2781 // test priority of scheduled fibers
2782
2783 uint gen;
2784 string[] runOrder;
2785
2786 void verify(bool Priority)(string id) {
2787
2788 static if( Priority ) {
2789 auto priorityWatcher = theReactor.boostFiberPriority();
2790 }
2791 theReactor.suspendCurrentFiber();
2792 runOrder ~= id;
2793 }
2794
2795 testWithReactor({
2796 FiberHandle[] fh;
2797 fh ~= theReactor.spawnFiber(&verify!false, "reg1");
2798 fh ~= theReactor.spawnFiber(&verify!false, "reg2");
2799 fh ~= theReactor.spawnFiber(&verify!false, "imm1"); // Scheduled immediate
2800 fh ~= theReactor.spawnFiber(&verify!true, "pri1");
2801 fh ~= theReactor.spawnFiber(&verify!false, "imm2"); // Scheduled immediate
2802 fh ~= theReactor.spawnFiber(&verify!true, "pri2");
2803 fh ~= theReactor.spawnFiber(&verify!false, "reg3");
2804 fh ~= theReactor.spawnFiber(&verify!true, "pri3"); // reg1 bypasses this one in order to avoid starvation
2805 fh ~= theReactor.spawnFiber(&verify!false, "imm3"); // Scheduled immediate
2806 theReactor.yield();
2807 foreach(i; 0..fh.length)
2808 theReactor.resumeFiber( fh[i], i==2 || i==4 );
2809
2810 // Resuming immediate an already scheduled fiber should change its priority
2811 theReactor.resumeFiber( fh[8], true );
2812
2813 theReactor.yield();
2814 assertEQ(["imm1", "imm2", "imm3", "pri1", "pri2", "reg1", "pri3", "reg2", "reg3"], runOrder);
2815 });
2816 }
2817
2818 unittest {
2819 // Test join
2820
2821 static void fiberBody() {
2822 theReactor.yield();
2823 theReactor.yield();
2824 theReactor.yield();
2825 }
2826
2827 testWithReactor({
2828 FiberHandle fh = theReactor.spawnFiber!fiberBody();
2829 assertEQ( theReactor.getFiberState(fh), FiberState.Starting );
2830 theReactor.yield();
2831 assertEQ( theReactor.getFiberState(fh), FiberState.Scheduled );
2832 theReactor.joinFiber(fh);
2833 assertEQ( theReactor.getFiberState(fh), FiberState.None );
2834 theReactor.joinFiber(fh, Timeout(Duration.zero));
2835 });
2836 }
2837
2838 unittest {
2839 testWithReactor({
2840 void doNothing() {
2841 while( true )
2842 theReactor.sleep(1.msecs);
2843 }
2844
2845 FiberHandle[] handles;
2846 handles ~= theReactor.spawnFiber(&doNothing);
2847 handles ~= theReactor.spawnFiber(&doNothing);
2848 theReactor.sleep(5.msecs);
2849
2850 foreach(fh; handles) {
2851 theReactor.throwInFiber!FiberKilled(fh);
2852 }
2853
2854 DEBUG!"Sleeping for 1ms"();
2855 theReactor.sleep(1.msecs);
2856 DEBUG!"Woke up from sleep"();
2857 });
2858 }
2859
2860 unittest {
2861 testWithReactor({
2862 static void fiber() {
2863 theReactor.sleep(1.seconds);
2864 }
2865
2866 auto fh = theReactor.spawnFiber!fiber();
2867
2868 DEBUG!"Starting test"();
2869 foreach(i; 0..4) {
2870 theReactor.LOG_TRACEBACK_AS(fh, "test");
2871 DEBUG!"Back in fiber"();
2872 theReactor.yield();
2873 }
2874 DEBUG!"Test ended"();
2875 });
2876 }