1 /// Define the micro-threading reactor 2 module mecca.reactor; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 static import posix_signal = core.sys.posix.signal; 7 static import posix_time = core.sys.posix.time; 8 import core.memory: GC; 9 import core.sys.posix.signal; 10 import core.sys.posix.sys.mman: munmap, mprotect, PROT_NONE; 11 import core.thread: thread_isMainThread; 12 13 import std.exception; 14 import std..string; 15 import std.traits; 16 17 import mecca.containers.arrays; 18 import mecca.containers.lists; 19 import mecca.containers.pools; 20 import mecca.lib.concurrency; 21 import mecca.lib.consts; 22 import mecca.lib.exception; 23 import mecca.lib.integers : bitsInValue, createBitMask; 24 import mecca.lib.memory; 25 import mecca.lib.reflection; 26 import mecca.lib.time; 27 import mecca.lib.time_queue; 28 import mecca.lib.typedid; 29 import mecca.log; 30 import mecca.log.impl; 31 import mecca.platform.os; 32 import mecca.reactor.fiber_group; 33 import mecca.reactor.fls; 34 import mecca.reactor.impl.fibril: Fibril; 35 import mecca.reactor.subsystems.threading; 36 import mecca.reactor.sync.event: Signal; 37 public import mecca.reactor.types; 38 39 import std.stdio; 40 41 /// Handle for manipulating registered timers. 42 alias TimerHandle = Reactor.TimerHandle; 43 alias FiberIncarnation = ushort; 44 45 // The slot number in the stacks for the fiber 46 private alias FiberIdx = AlgebraicTypedIdentifier!("FiberIdx", ushort, ushort.max, ushort.max); 47 48 /// Track the fiber's state 49 enum FiberState : ubyte { 50 None, /// Fiber isn't running 51 Starting, /// Fiber was spawned, but have not yet started running 52 Scheduled, /// Fiber is waiting to run 53 Running, /// Fiber is running 54 Sleeping, /// Fiber is currently suspended 55 Done, /// Fiber has finished running 56 } 57 58 private { 59 extern(C) void* _d_eh_swapContext(void* newContext) nothrow @nogc; 60 extern(C) void* _d_eh_swapContextDwarf(void* newContext) nothrow @nogc; 61 } 62 63 struct ReactorFiber { 64 // Prevent accidental copying 65 @disable this(this); 66 67 static struct OnStackParams { 68 Closure fiberBody; 69 union { 70 DRuntimeStackDescriptor _stackDescriptor; 71 DRuntimeStackDescriptor* stackDescriptorPtr; 72 } 73 FiberGroup.Chain fgChain; 74 FLSArea flsBlock; 75 ExcBuf currExcBuf; 76 LogsFiberSavedContext logsSavedContext; 77 string fiberName; // String identifying what this fiber is doing 78 void* fiberPtr; // The same with a numerical value 79 Signal joinWaiters; 80 } 81 enum Flags: ubyte { 82 // XXX Do we need CALLBACK_SET? 83 CALLBACK_SET = 0x01, /// Has callback set 84 SPECIAL = 0x02, /// Special fiber. Not a normal user fiber 85 MAIN = 0x04, /// This is the main fiber (i.e. - the "not a fiber") 86 SCHEDULED = 0x08, /// Fiber currently scheduled to be run 87 SLEEPING = 0x10, /// Fiber is sleeping on a sync object 88 HAS_EXCEPTION = 0x20, /// Fiber has pending exception to be thrown in it 89 EXCEPTION_BT = 0x40, /// Fiber exception needs to have fiber's backtrace 90 PRIORITY = 0x80, /// Fiber is a high priority one 91 } 92 93 align(1): 94 Fibril fibril; 95 OnStackParams* params; 96 LinkedListWithOwner!(ReactorFiber*)* _owner; 97 FiberIdx _nextId; 98 FiberIdx _prevId; 99 FiberIncarnation incarnationCounter; 100 ubyte _flags; 101 FiberState _state; 102 static extern(C) void* function(void*) @nogc nothrow _swapEhContext = &swapEhContextChooser; 103 104 // We define this struct align(1) for the sole purpose of making the following static assert verify what it's supposed to 105 static assert (this.sizeof == 32); // keep it small and cache-line friendly 106 107 // LinkedQueue access through property 108 @property ReactorFiber* _next() const nothrow @safe @nogc { 109 return to!(ReactorFiber*)(_nextId); 110 } 111 112 @property void _next(FiberIdx newNext) nothrow @safe @nogc { 113 _nextId = newNext; 114 } 115 116 @property void _next(ReactorFiber* newNext) nothrow @safe @nogc { 117 _nextId = to!FiberIdx(newNext); 118 } 119 120 @property ReactorFiber* _prev() const nothrow @safe @nogc { 121 return to!(ReactorFiber*)(_prevId); 122 } 123 124 @property void _prev(FiberIdx newPrev) nothrow @safe @nogc { 125 _prevId = newPrev; 126 } 127 128 @property void _prev(ReactorFiber* newPrev) nothrow @safe @nogc { 129 _prevId = to!FiberIdx(newPrev); 130 } 131 132 @notrace void setup(void[] stackArea, bool main) nothrow @nogc { 133 _flags = 0; 134 135 if( !main ) 136 fibril.set(stackArea[0 .. $ - OnStackParams.sizeof], &wrapper); 137 else 138 flag!"MAIN" = true; 139 140 params = cast(OnStackParams*)&stackArea[$ - OnStackParams.sizeof]; 141 setToInit(params); 142 143 if( !main ) { 144 params._stackDescriptor.bstack = stackArea.ptr + stackArea.length; // Include params, as FLS is stored there 145 params._stackDescriptor.tstack = fibril.rsp; 146 params._stackDescriptor.add(); 147 } else { 148 import core.thread: Thread; 149 params.stackDescriptorPtr = cast(DRuntimeStackDescriptor*)accessMember!("m_curr")(Thread.getThis()); 150 DBG_ASSERT!"MAIN not set on main fiber"( flag!"MAIN" ); 151 } 152 153 _next = null; 154 incarnationCounter = 0; 155 } 156 157 @notrace void teardown() nothrow @nogc { 158 fibril.reset(); 159 if (!flag!"MAIN") { 160 params._stackDescriptor.remove(); 161 } 162 params = null; 163 } 164 165 @notrace void switchTo(ReactorFiber* next) nothrow @trusted @nogc { 166 pragma(inline, true); 167 import core.thread: Thread; 168 169 DRuntimeStackDescriptor* currentSD = stackDescriptor; 170 DRuntimeStackDescriptor* nextSD = next.stackDescriptor; 171 172 currentSD.ehContext = _swapEhContext(nextSD.ehContext); 173 174 // Since druntime does not expose the interfaces needed for switching fibers, we need to hack around the 175 // protection system to access Thread.m_curr, which is private. 176 DRuntimeStackDescriptor** threadCurrentSD = cast(DRuntimeStackDescriptor**)&accessMember!("m_curr")(Thread.getThis()); 177 *threadCurrentSD = nextSD; 178 179 fibril.switchTo(next.fibril, ¤tSD.tstack); 180 } 181 182 @notrace @property private DRuntimeStackDescriptor* stackDescriptor() @trusted @nogc nothrow { 183 if( !flag!"MAIN" ) { 184 return ¶ms._stackDescriptor; 185 } else { 186 return params.stackDescriptorPtr; 187 } 188 } 189 190 @property FiberId identity() const nothrow @safe @nogc { 191 FiberId.UnderlyingType res; 192 res = to!FiberIdx(&this).value; 193 res |= incarnationCounter << theReactor.maxNumFibersBits; 194 195 return FiberId(res); 196 } 197 198 @property bool flag(string NAME)() const pure nothrow @safe @nogc { 199 return (_flags & __traits(getMember, Flags, NAME)) != 0; 200 } 201 @property void flag(string NAME)(bool value) pure nothrow @safe @nogc { 202 if (value) { 203 _flags |= __traits(getMember, Flags, NAME); 204 } 205 else { 206 import mecca.lib.integers: bitComplement; 207 _flags &= bitComplement(__traits(getMember, Flags, NAME)); 208 } 209 } 210 211 @property FiberState state() pure const nothrow @safe @nogc { 212 return _state; 213 } 214 215 @property void state(FiberState newState) nothrow @safe @nogc { 216 DBG_ASSERT!"%s trying to switch state from %s to %s, but histogram claims no fibers in this state. %s"( 217 theReactor.stats.fibersHistogram[_state]>0, identity, _state, newState, 218 theReactor.stats.fibersHistogram ); 219 220 theReactor.stats.fibersHistogram[_state]--; 221 theReactor.stats.fibersHistogram[newState]++; 222 223 _state = newState; 224 } 225 226 @property bool isAlive() const pure nothrow @safe @nogc { 227 with(FiberState) switch( state ) { 228 case None: 229 return false; 230 case Done: 231 assert(false, "Fiber is in an invalid state Done"); 232 default: 233 return true; 234 } 235 } 236 237 private: 238 @notrace void wrapper() nothrow { 239 bool skipBody; 240 Throwable ex = null; 241 242 try { 243 // Nobody else will run it the first time the fiber is started. This results in some unfortunate 244 // code duplication 245 switchInto(); 246 } catch (FiberInterrupt ex2) { 247 INFO!"Fiber %s killed by FiberInterrupt exception: %s"(identity, ex2.msg); 248 skipBody = true; 249 } catch(Throwable ex2) { 250 ex = ex2; 251 skipBody = true; 252 } 253 254 while (true) { 255 DBG_ASSERT!"skipBody=false with pending exception"(ex is null || skipBody); 256 scope(exit) ex = null; 257 258 if( !skipBody ) { 259 params.logsSavedContext = LogsFiberSavedContext.init; 260 INFO!"Fiber %s started generation %s flags=0x%0x"(identity, incarnationCounter, _flags); 261 262 ASSERT!"Reactor's current fiber isn't the running fiber" (theReactor.thisFiber is &this); 263 ASSERT!"Fiber %s is in state %s instead of Running" (state == FiberState.Running, identity, state); 264 265 try { 266 // Perform the actual fiber callback 267 params.fiberBody(); 268 } catch (FiberInterrupt ex2) { 269 INFO!"Fiber %s killed by FiberInterrupt exception: %s"(identity, ex2.msg); 270 } catch (Throwable ex2) { 271 ex = ex2; 272 } 273 } 274 275 ASSERT!"Fiber still member of fiber group at termination" (params.fgChain.owner is null); 276 277 params.joinWaiters.signal(); 278 if( ex is null ) { 279 INFO!"Fiber %s finished"(identity); 280 } else { 281 ERROR!"Fiber %s finished with exception: %s"(identity, ex.msg); 282 LOG_EXCEPTION(ex); 283 } 284 285 params.fiberBody.clear(); 286 params.fiberName = null; 287 params.fiberPtr = null; 288 flag!"CALLBACK_SET" = false; 289 ASSERT!"Fiber %s state is %s instead of Running at end of execution"( 290 state == FiberState.Running, identity, state); 291 state = FiberState.Done; 292 incarnationCounter++; 293 if (ex !is null) { 294 theReactor.forwardExceptionToMain(ex); 295 assert(false); 296 } else { 297 skipBody = theReactor.fiberTerminated(); 298 } 299 } 300 } 301 302 void switchInto() @safe @nogc { 303 // We might have a cross fiber hook. If we do, we need to repeat this part several times 304 bool switched = false; 305 do { 306 switchCurrExcBuf( ¶ms.currExcBuf ); 307 if (!flag!"SPECIAL") { 308 params.flsBlock.switchTo(); 309 } else { 310 FLSArea.switchToNone(); 311 } 312 logSwitchInto(); 313 314 if( theReactor.crossFiberHook !is null ) { 315 theReactor.performCrossFiberHook(); 316 } else { 317 switched = true; 318 } 319 } while(!switched); 320 321 if (flag!"HAS_EXCEPTION") { 322 Throwable ex = params.currExcBuf.get(); 323 ASSERT!"Fiber has exception, but exception is null"(ex !is null); 324 if (flag!"EXCEPTION_BT") { 325 params.currExcBuf.setTraceback(ex); 326 flag!"EXCEPTION_BT" = false; 327 } 328 329 flag!"HAS_EXCEPTION" = false; 330 throw ex; 331 } 332 } 333 334 void logSwitchInto() nothrow @safe @nogc{ 335 logSwitchFiber(¶ms.logsSavedContext, cast( Parameters!logSwitchFiber[1] )identity.value); 336 } 337 338 private: 339 static extern(C) void* swapEhContextChooser(void * newContext) @nogc nothrow @notrace { 340 DBG_ASSERT!"Context is not null on first invocation"(newContext is null); 341 void* std = _d_eh_swapContext(newContext); 342 void* dwarf = _d_eh_swapContextDwarf(newContext); 343 344 if( std !is null ) { 345 _swapEhContext = &_d_eh_swapContext; 346 return std; 347 } else if( dwarf !is null ) { 348 _swapEhContext = &_d_eh_swapContextDwarf; 349 return dwarf; 350 } 351 352 // Cannot tell which is correct yet 353 return null; 354 } 355 } 356 357 358 /** 359 A handle to a running fiber. 360 361 This handle expires automatically when the fiber stops running. Unless you know, semantically, that a fiber is still running, don't assume 362 there is a running fiber attached to this handle. 363 364 The handle will correctly claim invalidity even if a new fiber is launched with the same FiberId. 365 */ 366 struct FiberHandle { 367 private: 368 FiberId identity; 369 FiberIncarnation incarnation; 370 371 public: 372 /// Returns whether the handle was set 373 /// 374 /// Unlike `isValid`, this does not check whether the handle is still valid. It only returns whether the handle 375 /// is in initialized state. 376 @property bool isSet() pure const nothrow @safe @nogc { 377 return identity.isValid; 378 } 379 380 /// Returns whether the handle currently describes a running fiber. 381 @property bool isValid() const nothrow @safe @nogc { 382 return get() !is null; 383 } 384 385 /// the `FiberId` described by the handle. If the handle is no longer valid, will return FiberId.invalid 386 /// 387 /// Use `getFiberId` if you want the `FiberId` of a no-longer valid handle. 388 @property FiberId fiberId() const nothrow @safe @nogc { 389 if( isValid ) 390 return identity; 391 392 return FiberId.invalid; 393 } 394 395 /// returns the original `FiberId` set for the handle, whether still valid or not 396 FiberId getFiberId() const nothrow @safe @nogc pure { 397 return identity; 398 } 399 400 /// Reset the handle to uninitialized state 401 @notrace void reset() nothrow @safe @nogc { 402 this = FiberHandle.init; 403 } 404 405 package: 406 this(ReactorFiber* fib) nothrow @safe @nogc { 407 opAssign(fib); 408 } 409 410 auto ref opAssign(ReactorFiber* fib) nothrow @safe @nogc { 411 if (fib) { 412 identity = fib.identity; 413 incarnation = fib.incarnationCounter; 414 } 415 else { 416 identity = FiberId.invalid; 417 } 418 return this; 419 } 420 421 ReactorFiber* get() const nothrow @safe @nogc { 422 if (!theReactor.isRunning) 423 return null; 424 425 if (!identity.isValid) { 426 return null; 427 } 428 429 ReactorFiber* fiber = &theReactor.allFibers[to!FiberIdx(identity).value]; 430 431 DBG_ASSERT!"Fiber state is transient state Done"(fiber.state != FiberState.Done); 432 if(fiber.state == FiberState.None || fiber.incarnationCounter != incarnation) { 433 return null; 434 } 435 436 return fiber; 437 } 438 } 439 440 /** 441 The main scheduler for the micro-threading architecture. 442 */ 443 struct Reactor { 444 // Prevent accidental copying 445 @disable this(this); 446 447 /// Delegates passed to `registerIdleCallback` must be of this signature 448 alias IdleCallbackDlg = bool delegate(Duration); 449 450 /// The options control aspects of the reactor's operation 451 struct OpenOptions { 452 /// Maximum number of fibers. 453 ushort numFibers = 256; 454 /// Stack size of each fiber (except the main fiber). The reactor will allocate numFiber*fiberStackSize during startup 455 size_t fiberStackSize = 32*KB; 456 /** 457 How often does the GC's collection run. 458 459 The reactor uses unconditional periodic collection, rather than lazy evaluation one employed by the default GC 460 settings. This setting sets how often the collection cycle should run. See `gcRunThreshold` for how to not 461 run the GC collection when not needed. 462 */ 463 Duration gcInterval = 30.seconds; 464 465 /** 466 * Allocation threshold to trigger a GC run 467 * 468 * If the amount of memory allocated since the previous GC run is less than this amount of bytes, the GC scan 469 * will be skipped. 470 * 471 * Setting this value to 0 forces a run every `gcInterval`, regardless of how much was allocated. 472 */ 473 size_t gcRunThreshold = 16*MB; 474 475 /** 476 Base granularity of the reactor's timer. 477 478 Any scheduled task is scheduled at no better accuracy than timerGranularity. $(B In addition to) the fact that 479 a timer task may be delayed. As such, with a 1ms granularity, a task scheduled for 1.5ms from now is the same 480 as a task scheduled for 2ms from now, which may run 3ms from now. 481 */ 482 Duration timerGranularity = 1.msecs; 483 /** 484 Hogger detection threshold. 485 486 A hogger is a fiber that does not release the CPU to run other tasks for a long period of time. Often, this is 487 a result of a bug (i.e. - calling the OS's `sleep` instead of the reactor's). 488 489 Hogger detection works by measuring how long each fiber took until it allows switching away. If the fiber took 490 more than hoggerWarningThreshold, a warning is logged. 491 */ 492 Duration hoggerWarningThreshold = 200.msecs; 493 /** 494 Maximum desired fiber run time 495 496 A fiber should aim not to run more than this much time without a voluntary context switch. This value affects 497 the `shouldYield` and `considerYield` calls. 498 */ 499 Duration maxDesiredRunTime = 150.msecs; 500 /** 501 Hard hang detection. 502 503 This is a similar safeguard to that used by the hogger detection. If activated (disabled by default), it 504 premptively prompts the reactor every set time to see whether fibers are still being switched in/out. If it 505 finds that the same fiber is running, without switching out, for too long, it terminates the entire program. 506 */ 507 Duration hangDetectorTimeout = Duration.zero; 508 /** 509 Whether to enable fault handlers 510 511 The fault handlers catch program faults (such as segmentation faults) and log them with their backtrace. 512 */ 513 bool faultHandlersEnabled = true; 514 515 /// Maximal number of timers that can be simultaneously registered. 516 size_t numTimers = 10_000; 517 518 /// Number of threads servicing deferred tasks 519 uint numThreadsInPool = 4; 520 /// Worker thread stack size 521 size_t threadStackSize = 512*KB; 522 523 /// Whether we have enabled deferToThread 524 bool threadDeferralEnabled; 525 526 /** 527 Whether the reactor should register the default (fd processing) idle handler 528 529 If this value is set to `false`, the poller is still opened. It's idle function would not be automatically 530 called, however, so file operations might block indefinitely unless another mechanism (such as timer based) 531 is put in place to call it periodically. 532 533 The non-registered idle handler can be manually triggered by calling `poller.poll`. 534 */ 535 bool registerDefaultIdler = true; 536 537 version(unittest) { 538 /// Disable all GC collection during the reactor run time. Only available for UTs. 539 bool utGcDisabled; 540 } else { 541 private enum utGcDisabled = false; 542 } 543 } 544 545 /// Used by `reportStats` to report statistics about the reactor 546 struct Stats { 547 ulong[FiberState.max+1] fibersHistogram; /// Total number of user fibers in each `FiberState` 548 ulong numContextSwitches; /// Number of time we switched between fibers 549 ulong idleCycles; /// Total number of idle cycles 550 551 /// Returns number of currently used fibers 552 @property ulong numUsedFibers() pure const nothrow @safe @nogc { 553 ulong ret; 554 555 with(FiberState) { 556 foreach( state; [Starting, Scheduled, Running, Sleeping]) { 557 ret += fibersHistogram[state]; 558 } 559 } 560 561 return ret; 562 } 563 564 /// Returns number of fibers available to be run 565 @property ulong numFreeFibers() pure const nothrow @safe @nogc { 566 ulong ret; 567 568 with(FiberState) { 569 foreach( state; [None]) { 570 ret += fibersHistogram[state]; 571 } 572 573 DBG_ASSERT!"%s fibers in Done state. Should be 0"(fibersHistogram[Done] == 0, fibersHistogram[Done]); 574 } 575 576 return ret; 577 } 578 } 579 580 private: 581 enum MAX_IDLE_CALLBACKS = 16; 582 enum TIMER_NUM_BINS = 256; 583 enum TIMER_NUM_LEVELS = 4; 584 enum MAX_DEFERRED_TASKS = 1024; 585 version(unittest) package enum UT_MAX_DEFERRED_TASKS = MAX_DEFERRED_TASKS; 586 587 enum GUARD_ZONE_SIZE = SYS_PAGE_SIZE; 588 589 enum NUM_SPECIAL_FIBERS = 2; 590 enum ZERO_DURATION = Duration.zero; 591 592 enum MainFiberId = FiberId(0); 593 enum IdleFiberId = FiberId(1); 594 595 bool _open; 596 bool _running; 597 bool _stopping; 598 bool _gcCollectionNeeded; 599 bool _gcCollectionForce; 600 bool _hangDetectorEnabled; 601 ubyte maxNumFibersBits; // Number of bits sufficient to represent the maximal number of fibers 602 bool nothingScheduled; // true if there is no scheduled fiber 603 enum HIGH_PRIORITY_SCHEDULES_RATIO = 2; // How many high priority schedules before shceduling a low priority fiber 604 ubyte highPrioritySchedules; // Number of high priority schedules done 605 FiberIdx.UnderlyingType maxNumFibersMask; 606 int reactorReturn; 607 int criticalSectionNesting; 608 OpenOptions optionsInEffect; 609 Stats stats; 610 GC.Stats lastGCStats; 611 612 MmapBuffer fiberStacks; 613 MmapArray!ReactorFiber allFibers; 614 LinkedQueueWithLength!(ReactorFiber*) freeFibers; 615 enum FiberPriorities { NORMAL, HIGH, IMMEDIATE }; 616 LinkedListWithOwner!(ReactorFiber*) scheduledFibersNormal, scheduledFibersHigh, scheduledFibersImmediate; 617 618 ReactorFiber* _thisFiber; 619 ReactorFiber* mainFiber; 620 ReactorFiber* idleFiber; 621 FixedArray!(IdleCallbackDlg, MAX_IDLE_CALLBACKS) idleCallbacks; 622 // Point to idleCallbacks as a range, in case it gets full and we need to spill over to GC allocation 623 IdleCallbackDlg[] actualIdleCallbacks; 624 __gshared Timer hangDetectorTimer; 625 626 SignalHandlerValue!TscTimePoint fiberRunStartTime; 627 void delegate() nothrow @nogc @safe crossFiberHook; 628 FiberHandle crossFiberHookCaller; // FiberHandle to return to after performing the hook 629 630 alias TimedCallbackGeneration = TypedIdentifier!("TimedCallbackGeneration", ulong, ulong.max, ulong.max); 631 struct TimedCallback { 632 TimedCallback* _next, _prev; 633 timeQueue.OwnerAttrType _owner; 634 TimedCallbackGeneration generation; 635 TscTimePoint timePoint; 636 ulong intervalCycles; // How many cycles between repeatetions. Zero means non-repeating 637 638 Closure closure; 639 } 640 641 // TODO change to mmap pool or something 642 SimplePool!(TimedCallback) timedCallbacksPool; 643 TimedCallbackGeneration.Allocator timedCallbackGeneration; 644 CascadingTimeQueue!(TimedCallback*, TIMER_NUM_BINS, TIMER_NUM_LEVELS, true) timeQueue; 645 646 ThreadPool!MAX_DEFERRED_TASKS threadPool; 647 version(unittest) package ref auto utThreadPool() inout { return threadPool; } 648 649 public: 650 /// Report whether the reactor has been properly opened (i.e. - setup has been called). 651 @property bool isOpen() const pure nothrow @safe @nogc { 652 return _open; 653 } 654 655 /// Report whether the reactor is currently active 656 /// 657 /// Unlike `isRunning`, this will return `false` during the reactor shutdown. 658 @property bool isActive() const pure nothrow @safe @nogc { 659 return _running && !_stopping; 660 } 661 662 /// Report whether the reactor is currently running 663 @property bool isRunning() const pure nothrow @safe @nogc { 664 return _running; 665 } 666 667 /** 668 Set the reactor up for doing work. 669 670 All options must be set before calling this function. 671 */ 672 void setup(OpenOptions options = OpenOptions.init) { 673 INFO!"Setting up reactor"(); 674 675 assert (!isOpen, "reactor.setup called twice"); 676 _open = true; 677 assert (thread_isMainThread); 678 assert (options.numFibers > NUM_SPECIAL_FIBERS); 679 reactorReturn = 0; 680 optionsInEffect = options; 681 682 maxNumFibersBits = bitsInValue(optionsInEffect.numFibers - 1); 683 maxNumFibersMask = createBitMask!(FiberIdx.UnderlyingType)(maxNumFibersBits); 684 685 stats = Stats.init; 686 stats.fibersHistogram[FiberState.None] = options.numFibers; 687 688 const stackPerFib = (((options.fiberStackSize + SYS_PAGE_SIZE - 1) / SYS_PAGE_SIZE) + 1) * SYS_PAGE_SIZE; 689 fiberStacks.allocate(stackPerFib * options.numFibers); 690 allFibers.allocate(options.numFibers); 691 692 _thisFiber = null; 693 criticalSectionNesting = 0; 694 idleCallbacks.length = 0; 695 actualIdleCallbacks = null; 696 697 foreach(i, ref fib; allFibers) { 698 auto stack = fiberStacks[i * stackPerFib .. (i + 1) * stackPerFib]; 699 errnoEnforce(mprotect(stack.ptr, GUARD_ZONE_SIZE, PROT_NONE) == 0, "Guard zone protection failed"); 700 fib.setup(stack[GUARD_ZONE_SIZE .. $], i==0); 701 702 if (i >= NUM_SPECIAL_FIBERS) { 703 freeFibers.append(&fib); 704 } 705 } 706 707 mainFiber = &allFibers[MainFiberId.value]; 708 mainFiber.flag!"SPECIAL" = true; 709 mainFiber.flag!"CALLBACK_SET" = true; 710 mainFiber.state = FiberState.Running; 711 setFiberName(mainFiber, "mainFiber", &mainloop); 712 713 idleFiber = &allFibers[IdleFiberId.value]; 714 idleFiber.flag!"SPECIAL" = true; 715 idleFiber.flag!"CALLBACK_SET" = true; 716 idleFiber.params.fiberBody.set(&idleLoop); 717 idleFiber.state = FiberState.Sleeping; 718 setFiberName(idleFiber, "idleFiber", &idleLoop); 719 720 timedCallbacksPool.open(options.numTimers, true); 721 timeQueue.open(options.timerGranularity); 722 723 if( options.faultHandlersEnabled ) 724 registerFaultHandlers(); 725 726 if( options.threadDeferralEnabled ) 727 threadPool.open(options.numThreadsInPool, options.threadStackSize); 728 729 import mecca.reactor.io.fd; 730 _openReactorEpoll(); 731 732 if(options.registerDefaultIdler) { 733 import mecca.reactor.subsystems.poller; 734 theReactor.registerIdleCallback(&poller.reactorIdle); 735 } 736 737 import mecca.reactor.io.signals; 738 reactorSignal._open(); 739 740 enum TIMER_GRANULARITY = 4; // Number of wakeups during the monitored period 741 Duration threshold = optionsInEffect.hangDetectorTimeout / TIMER_GRANULARITY; 742 hangDetectorTimer = Timer(threshold, &hangDetectorHandler); 743 } 744 745 /** 746 Shut the reactor down. 747 */ 748 void teardown() { 749 INFO!"Tearing down reactor"(); 750 751 ASSERT!"reactor teardown called on non-open reactor"(isOpen); 752 ASSERT!"reactor teardown called on still running reactor"(!isRunning); 753 ASSERT!"reactor teardown called inside a critical section"(criticalSectionNesting==0); 754 755 import mecca.reactor.io.signals; 756 reactorSignal._close(); 757 758 import mecca.reactor.io.fd; 759 _closeReactorEpoll(); 760 761 foreach(i, ref fib; allFibers) { 762 fib.teardown(); 763 } 764 765 if( optionsInEffect.threadDeferralEnabled ) 766 threadPool.close(); 767 switchCurrExcBuf(null); 768 769 // disableGCTracking(); 770 771 if( optionsInEffect.faultHandlersEnabled ) 772 deregisterFaultHandlers(); 773 774 allFibers.free(); 775 fiberStacks.free(); 776 timeQueue.close(); 777 timedCallbacksPool.close(); 778 779 setToInit(freeFibers); 780 setToInit(scheduledFibersNormal); 781 setToInit(scheduledFibersHigh); 782 setToInit(scheduledFibersImmediate); 783 nothingScheduled = true; 784 785 _thisFiber = null; 786 mainFiber = null; 787 idleFiber = null; 788 idleCallbacks.length = 0; 789 actualIdleCallbacks = null; 790 791 _open = false; 792 } 793 794 /** 795 Register an idle handler callback. 796 797 The reactor handles scheduling and fibers switching, but has no built-in mechanism for scheduling sleeping fibers 798 back to execution (except fibers sleeping on a timer, of course). Mechanisms such as file descriptor watching are 799 external, and are registered using this function. 800 801 The idler callback should receive a timeout variable. This indicates the time until the closest timer expires. If 802 no other event comes in, the idler should strive to wake up after that long. Waking up sooner will, likely, cause 803 the idler to be called again. Waking up later will delay timer tasks (which is allowed by the API contract). 804 805 It is allowed to register more than one idler callback. Doing so, however, will cause $(B all) of them to be 806 called with a timeout of zero (i.e. - don't sleep), resulting in a busy wait for events. 807 808 The idler is expected to return a boolean value indicating whether the time spent inside the idler is to be 809 counted as idle time, or whether that's considered "work". This affects the results returned by the `idleCycles` 810 field returned by the `reactorStats`. 811 */ 812 void registerIdleCallback(IdleCallbackDlg dg) nothrow @safe { 813 // You will notice our deliberate lack of function to unregister 814 if( actualIdleCallbacks.length==idleCallbacks.capacity ) { 815 WARN!"Idle callbacks capacity reached - switching to GC allocated list"(); 816 } 817 818 if( actualIdleCallbacks.length<idleCallbacks.capacity ) { 819 idleCallbacks ~= dg; 820 actualIdleCallbacks = idleCallbacks[]; 821 } else { 822 actualIdleCallbacks ~= dg; 823 } 824 825 DEBUG!"%s idle callbacks registered"(actualIdleCallbacks.length); 826 } 827 828 /** 829 * Spawn a new fiber for execution. 830 * 831 * Parameters: 832 * The first argument must be the function/delegate to call inside the new fiber. If said callable accepts further 833 * arguments, then they must be provided as further arguments to spawnFiber. 834 * 835 * Returns: 836 * A FiberHandle to the newly created fiber. 837 */ 838 @notrace FiberHandle spawnFiber(T...)(T args) nothrow @safe @nogc { 839 static assert(T.length>=1, "Must pass at least the function/delegate to spawnFiber"); 840 static assert(isDelegate!(T[0]) || isFunctionPointer!(T[0]), 841 "spawnFiber first argument must be function or delegate"); 842 static assert( is( ReturnType!(T[0]) == void ), "spawnFiber callback must be of type void" ); 843 auto fib = _spawnFiber(false); 844 fib.params.fiberBody.set(args); 845 setFiberName(fib, "Fiber", args[0]); 846 return FiberHandle(fib); 847 } 848 849 /** 850 * Spawn a new fiber for execution. 851 * 852 * Params: 853 * F = The function or delegate to call inside the fiber. 854 * args = The arguments for F 855 * 856 * Returns: 857 * A FiberHandle to the newly created fiber. 858 */ 859 @notrace FiberHandle spawnFiber(alias F)(Parameters!F args) 860 if (!isType!F) { 861 static assert( is( ReturnType!F == void ), "spawnFiber callback must be of type void" ); 862 auto fib = _spawnFiber(false); 863 import std.algorithm: move; 864 // pragma(msg, genMoveArgument( args.length, "fib.params.fiberBody.set!F", "args" ) ); 865 mixin( genMoveArgument( args.length, "fib.params.fiberBody.set!F", "args" ) ); 866 867 setFiberName(fib, F.mangleof, &F); 868 return FiberHandle(fib); 869 } 870 871 /// Returns whether currently running fiber is the idle fiber. 872 @property bool isIdle() pure const nothrow @safe @nogc { 873 return thisFiber is idleFiber; 874 } 875 876 /// Returns whether currently running fiber is the main fiber. 877 @property bool isMain() pure const nothrow @safe @nogc { 878 return thisFiber is mainFiber; 879 } 880 881 /// Returns whether currently running fiber is a special (i.e. - non-user) fiber 882 @property bool isSpecialFiber() const nothrow @safe @nogc { 883 return thisFiber.flag!"SPECIAL"; 884 } 885 886 /// Returns a FiberHandle to the currently running fiber 887 @property FiberHandle currentFiberHandle() nothrow @safe @nogc { 888 // XXX This assert may be incorrect, but it is easier to remove an assert than to add one 889 DBG_ASSERT!"Should not blindly get fiber handle of special fiber %s"(!isSpecialFiber, currentFiberId); 890 return FiberHandle(thisFiber); 891 } 892 @property package ReactorFiber* currentFiberPtr() nothrow @safe @nogc { 893 return getCurrentFiberPtr(false); 894 } 895 @notrace private ReactorFiber* getCurrentFiberPtr(bool specialOkay) nothrow @safe @nogc { 896 // XXX This assert may be incorrect, but it is easier to remove an assert than to add one 897 ASSERT!"Should not blindly get fiber handle of special fibers %s"(specialOkay || !isSpecialFiber, currentFiberId); 898 return thisFiber; 899 } 900 /** 901 Returns the FiberId of the currently running fiber. 902 903 You should almost never store the FiberId for later comparison or pass it to another fiber. Doing so risks having the current fiber 904 die and another one spawned with the same FiberId. If that's what you want to do, use currentFiberHandle instead. 905 */ 906 @property FiberId currentFiberId() const nothrow @safe @nogc { 907 return thisFiber.identity; 908 } 909 910 /// Returns the `FiberState` of the specified fiber. 911 FiberState getFiberState(FiberHandle fh) const nothrow @safe @nogc { 912 auto fiber = fh.get(); 913 914 if( fiber is null ) 915 return FiberState.None; 916 917 if( fiber.state==FiberState.Sleeping && fiber.flag!"SCHEDULED" ) 918 return FiberState.Scheduled; 919 920 return fiber.state; 921 } 922 923 /** 924 Starts up the reactor. 925 926 The reactor should already have at least one user fiber at that point, or it will start, but sit there and do nothing. 927 928 This function "returns" only after the reactor is stopped and no more fibers are running. 929 930 Returns: 931 The return value from `start` is the value passed to the `stop` function. 932 */ 933 int start() { 934 _isReactorThread = true; 935 scope(exit) _isReactorThread = false; 936 937 META!"Reactor started"(); 938 assert( idleFiber !is null, "Reactor started without calling \"setup\" first" ); 939 mainloop(); 940 META!"Reactor stopped"(); 941 942 return reactorReturn; 943 } 944 945 /** 946 Stop the reactor, killing all fibers. 947 948 This will kill all running fibers and trigger a return from the original call to Reactor.start. 949 950 Typically, this function call never returns (throws ReactorExit). However, if stop is called while already in the 951 process of stopping, it will just return. It is, therefor, not wise to rely on that fact. 952 953 Params: 954 reactorReturn = The return value to be returned from `start` 955 */ 956 void stop(int reactorReturn = 0) @safe @nogc { 957 if( !isActive ) { 958 ERROR!"Reactor.stop called, but reactor is not running"(); 959 return; 960 } 961 962 INFO!"Stopping reactor with exit code %s"(reactorReturn); 963 this.reactorReturn = reactorReturn; 964 965 _stopping = true; 966 if( !isMain() ) { 967 resumeSpecialFiber(mainFiber); 968 } 969 970 throw mkEx!ReactorExit(); 971 } 972 973 /** 974 enter a no-fiber switch piece of code. 975 976 If the code tries to switch away from the current fiber before leaveCriticalSection is called, the reactor will throw an assert. 977 This helps writing code that does interruption sensitive tasks without locks and making sure future changes don't break it. 978 979 Critical sections are nestable. 980 981 Also see `criticalSection` below. 982 */ 983 void enterCriticalSection() nothrow @safe @nogc { 984 pragma(inline, true); 985 criticalSectionNesting++; 986 } 987 988 /// leave the innermost critical section. 989 void leaveCriticalSection() nothrow @safe @nogc { 990 pragma(inline, true); 991 assert (criticalSectionNesting > 0); 992 criticalSectionNesting--; 993 } 994 995 /// Reports whether execution is currently within a critical section 996 @property bool isInCriticalSection() const pure nothrow @safe @nogc { 997 return criticalSectionNesting > 0; 998 } 999 1000 /** Make sure we are allowed to context switch from this point. 1001 * 1002 * This will be called automatically if an actual context switch is attempted. You might wish to call this function 1003 * explicitly, however, from contexts that $(I might) context switch, so that they fail even if they don't actually 1004 * attempt it, in accordance with the "fail early" doctrine. 1005 */ 1006 void assertMayContextSwitch(string message = "Simulated context switch") nothrow @safe @nogc { 1007 pragma(inline, true); 1008 ASSERT!"Context switch from outside the reactor thread: %s"(isReactorThread, message); 1009 ASSERT!"Context switch while inside a critical section: %s"(!isInCriticalSection, message); 1010 } 1011 1012 /** 1013 * Return a RAII object handling a critical section 1014 * 1015 * The function enters a critical section. Unlike enterCriticalSection, however, there is no need to explicitly call 1016 * leaveCriticalSection. Instead, the function returns a variable that calls leaveCriticalSection when it goes out of scope. 1017 * 1018 * There are two advantages for using criticalSection over enter/leaveCriticalSection. The first is for nicer scoping: 1019 * --- 1020 * with(theReactor.criticalSection) { 1021 * // Code in critical section goes here 1022 * } 1023 * --- 1024 * 1025 * The second case is if you need to switch, within the same code, between critical section and none: 1026 * --- 1027 * auto cs = theReactor.criticalSection; 1028 * 1029 * // Some code 1030 * cs.leave(); 1031 * // Some code that sleeps 1032 * cs.enter(); 1033 * 1034 * // The rest of the critical section code 1035 * --- 1036 * 1037 * The main advantage over enter/leaveCriticalSection is that this way, the critical section never leaks. Any exception thrown, either 1038 * from the code inside the critical section or the code temporary out of it, will result in the correct number of leaveCriticalSection 1039 * calls to zero out the effect. 1040 * 1041 * Also note that there is no need to call leave at the end of the code. Cs's destructor will do it for us $(B if necessary). 1042 */ 1043 @property auto criticalSection() nothrow @safe @nogc { 1044 pragma(inline, true); 1045 static struct CriticalSection { 1046 private: 1047 bool owner = true; 1048 1049 public: 1050 @disable this(this); 1051 ~this() nothrow @safe @nogc { 1052 if( owner ) 1053 leave(); 1054 } 1055 1056 void enter() nothrow @safe @nogc { 1057 DBG_ASSERT!"The same critical section was activated twice"(!owner); 1058 theReactor.enterCriticalSection(); 1059 owner = true; 1060 } 1061 1062 void leave() nothrow @safe @nogc { 1063 DBG_ASSERT!"Asked to leave critical section not owned by us"(owner); 1064 theReactor.leaveCriticalSection(); 1065 owner = false; 1066 } 1067 } 1068 enterCriticalSection(); 1069 return CriticalSection(); 1070 } 1071 unittest { 1072 testWithReactor({ 1073 { 1074 with( theReactor.criticalSection ) { 1075 assertThrows!AssertError( theReactor.yield() ); 1076 } 1077 } 1078 1079 theReactor.yield(); 1080 }); 1081 } 1082 1083 /** 1084 * Temporarily surrender the CPU for other fibers to run. 1085 * 1086 * Unlike suspend, the current fiber will automatically resume running after any currently scheduled fibers are finished. 1087 */ 1088 @notrace void yield() @safe @nogc { 1089 // TODO both scheduled and running is not a desired state to be in other than here. 1090 resumeFiber(thisFiber); 1091 suspendCurrentFiber(); 1092 } 1093 1094 /** 1095 * Returns whether the fiber is already running for a long time. 1096 * 1097 * Fibers that run for too long prevent other fibers from operating properly. On the other hand, fibers that initiate 1098 * a context switch needlessly load the system with overhead. 1099 * 1100 * This function reports whether the fiber is already running more than the desired time. 1101 * 1102 * The base time used is taken from `OpenOptions.maxDesiredRunTime`. 1103 * 1104 * Params: 1105 * tolerance = A multiplier for the amount of acceptable run time. Specifying 4 here will give you 4 times as much 1106 * time to run before a context switch is deemed necessary. 1107 */ 1108 @notrace bool shouldYield(uint tolerance = 1) const nothrow @safe @nogc { 1109 if( _stopping ) 1110 return true; 1111 1112 auto now = TscTimePoint.hardNow(); 1113 return (now - fiberRunStartTime) > tolerance*optionsInEffect.maxDesiredRunTime; 1114 } 1115 1116 /** 1117 Perform yield if fiber is running long enough. 1118 1119 Arguments and meaning are the same as for `shouldYield`. 1120 1121 Returns: 1122 Whether a yield actually took place. 1123 */ 1124 @notrace bool considerYield(uint tolerance = 1) @safe @nogc { 1125 if( shouldYield(tolerance) ) { 1126 yield(); 1127 return true; 1128 } 1129 1130 return false; 1131 } 1132 1133 /** 1134 * Give a fiber temporary priority in execution. 1135 * 1136 * Setting fiber priority means that the next time this fiber is scheduled, it will be scheduled ahead of other 1137 * fibers already scheduled to be run. 1138 * 1139 * Returns a RAII object that sets the priority back when destroyed. Typically, that happens at the end of the scope. 1140 * 1141 * Can be used as: 1142 * --- 1143 * with(boostFiberPriority()) { 1144 * // High priority stuff goes here 1145 * } 1146 * --- 1147 */ 1148 auto boostFiberPriority() nothrow @safe @nogc { 1149 static struct FiberPriorityRAII { 1150 private: 1151 FiberHandle fh; 1152 1153 public: 1154 @disable this(this); 1155 this(FiberHandle fh) @safe @nogc nothrow { 1156 this.fh = fh; 1157 } 1158 1159 ~this() @safe @nogc nothrow { 1160 if( fh.isValid ) { 1161 ASSERT!"Cannot move priority watcher between fibers (opened on %s)"( 1162 fh == theReactor.currentFiberHandle, fh.fiberId); 1163 1164 ASSERT!"Trying to reset priority which is not set"( theReactor.thisFiber.flag!"PRIORITY" ); 1165 theReactor.thisFiber.flag!"PRIORITY" = false; 1166 } 1167 } 1168 } 1169 1170 ASSERT!"Cannot ask to prioritize a non-user fiber"(!isSpecialFiber); 1171 ASSERT!"Asked to prioritize a fiber %s which is already high priority"( 1172 !thisFiber.flag!"PRIORITY", currentFiberId ); 1173 DEBUG!"Setting fiber priority"(); 1174 thisFiber.flag!"PRIORITY" = true; 1175 1176 return FiberPriorityRAII(currentFiberHandle); 1177 } 1178 1179 /// Handle used to manage registered timers 1180 struct TimerHandle { 1181 private: 1182 TimedCallback* callback; 1183 TimedCallbackGeneration generation; 1184 1185 public: 1186 1187 this(TimedCallback* callback) nothrow @safe @nogc { 1188 DBG_ASSERT!"Constructing TimedHandle from null callback"(callback !is null); 1189 this.callback = callback; 1190 this.generation = callback.generation; 1191 } 1192 1193 /// Returns whether the handle was used 1194 /// 1195 /// returns: 1196 /// `true` if the handle was set. `false` if it is still in its init value. 1197 @property bool isSet() const pure nothrow @safe @nogc { 1198 return callback !is null; 1199 } 1200 1201 /// Returns whether the handle describes a currently registered task 1202 @property bool isValid() const nothrow @safe @nogc { 1203 return 1204 isSet() && 1205 theReactor.isOpen && 1206 callback._owner !is null && 1207 generation == callback.generation; 1208 } 1209 1210 /// Revert the handle to init value, forgetting the timer it points to 1211 /// 1212 /// This call will $(B not) cancel the actual timer. Use `cancelTimer` for that. 1213 @notrace void reset() nothrow @safe @nogc { 1214 callback = null; 1215 generation = TimedCallbackGeneration.invalid; 1216 } 1217 1218 /// Cancel a currently registered timer 1219 void cancelTimer() nothrow @safe @nogc { 1220 if( isValid ) { 1221 theReactor.cancelTimerInternal(this); 1222 } 1223 1224 reset(); 1225 } 1226 } 1227 1228 /** 1229 * Registers a timer task. 1230 * 1231 * Params: 1232 * F = the callable to be invoked when the timer expires 1233 * timeout = when the timer is be called 1234 * params = the parameters to call F with 1235 * 1236 * Returns: 1237 * A handle to the just registered timer. 1238 */ 1239 TimerHandle registerTimer(alias F)(Timeout timeout, Parameters!F params) nothrow @safe @nogc { 1240 TimedCallback* callback = allocTimedCallback(); 1241 callback.closure.set(&F, params); 1242 callback.timePoint = timeout.expiry; 1243 callback.intervalCycles = 0; 1244 1245 timeQueue.insert(callback); 1246 1247 return TimerHandle(callback); 1248 } 1249 1250 /// ditto 1251 TimerHandle registerTimer(alias F)(Duration timeout, Parameters!F params) nothrow @safe @nogc { 1252 return registerTimer!F(Timeout(timeout), params); 1253 } 1254 1255 /** 1256 * Register a timer callback 1257 * 1258 * Same as `registerTimer!F`, except with the callback as an argument. Callback cannot accept parameters. 1259 */ 1260 TimerHandle registerTimer(T)(Timeout timeout, T dg) nothrow @safe @nogc { 1261 TimedCallback* callback = allocTimedCallback(); 1262 callback.closure.set(dg); 1263 callback.timePoint = timeout.expiry; 1264 callback.intervalCycles = 0; 1265 1266 timeQueue.insert(callback); 1267 1268 return TimerHandle(callback); 1269 } 1270 1271 /// ditto 1272 TimerHandle registerTimer(T)(Duration timeout, T dg) nothrow @safe @nogc { 1273 return registerTimer!T(Timeout(timeout), dg); 1274 } 1275 1276 /** 1277 * registers a timer that will repeatedly trigger at set intervals. 1278 * 1279 * You do not control precisely when the callback is invoked, only how often. The invocations are going to be evenly 1280 * spaced out (best effort), but the first invocation might be almost immediately after the call or a whole 1281 * `interval` after. 1282 * 1283 * You can use the `firstRun` argument to control when the first invocation is going to be (but the same rule will 1284 * still apply to the second one). 1285 * 1286 * Params: 1287 * interval = the frequency with which the callback will be called. 1288 * dg = the callback to invoke 1289 * F = an alias to the function to be called 1290 * params = the arguments to pass to F on each invocation 1291 * firstRun = if supplied, directly sets when is the first time the timer shall run. The value does not have any 1292 * special constraints. 1293 */ 1294 TimerHandle registerRecurringTimer(Duration interval, void delegate() dg) nothrow @safe @nogc { 1295 TimedCallback* callback = _registerRecurringTimer(interval); 1296 callback.closure.set(dg); 1297 return TimerHandle(callback); 1298 } 1299 1300 /// ditto 1301 TimerHandle registerRecurringTimer(alias F)(Duration interval, Parameters!F params) nothrow @safe @nogc { 1302 TimedCallback* callback = _registerRecurringTimer(interval); 1303 callback.closure.set(&F, params); 1304 return TimerHandle(callback); 1305 } 1306 1307 /// ditto 1308 TimerHandle registerRecurringTimer(Duration interval, void delegate() dg, Timeout firstRun) nothrow @safe @nogc { 1309 TimedCallback* callback = allocRecurringTimer(interval); 1310 callback.timePoint = firstRun.expiry; 1311 callback.closure.set(dg); 1312 1313 timeQueue.insert(callback); 1314 1315 return TimerHandle(callback); 1316 } 1317 1318 /// ditto 1319 TimerHandle registerRecurringTimer(alias F)(Duration interval, Parameters!F params, Timeout firstRun) nothrow @safe @nogc { 1320 TimedCallback* callback = allocRecurringTimer(interval); 1321 callback.timePoint = firstRun.expiry; 1322 callback.closure.set(&F, params); 1323 1324 timeQueue.insert(callback); 1325 1326 return TimerHandle(callback); 1327 } 1328 1329 private void cancelTimerInternal(TimerHandle handle) nothrow @safe @nogc { 1330 DBG_ASSERT!"cancelTimerInternal called with invalid handle"( handle.isValid ); 1331 timeQueue.cancel(handle.callback); 1332 timedCallbacksPool.release(handle.callback); 1333 } 1334 1335 /** 1336 * Schedule a callback for out of bounds immediate execution. 1337 * 1338 * For all intents and purposes, `call` is identical to `registerTimer` with an expired timeout. 1339 */ 1340 TimerHandle call(alias F)(Parameters!F params) nothrow @safe @nogc { 1341 return registerTimer!F(Timeout.elapsed, params); 1342 } 1343 1344 /// ditto 1345 TimerHandle call(T)(T dg) nothrow @safe @nogc { 1346 return registerTimer(Timeout.elapsed, dg); 1347 } 1348 1349 /// Suspend the current fiber for a specified amount of time 1350 void sleep(Duration duration) @safe @nogc { 1351 sleep(Timeout(duration)); 1352 } 1353 1354 /// ditto 1355 void sleep(Timeout until) @safe @nogc { 1356 assert(until != Timeout.init, "sleep argument uninitialized"); 1357 auto timerHandle = registerTimer!resumeFiber(until, currentFiberHandle, false); 1358 scope(failure) timerHandle.cancelTimer(); 1359 1360 suspendCurrentFiber(); 1361 } 1362 1363 /** 1364 Resume a suspended fiber 1365 1366 You are heartily encouraged not to use this function directly. In almost all cases, it is better to use one of the 1367 synchronization primitives instead. 1368 1369 Params: 1370 handle = the handle of the fiber to be resumed. 1371 priority = If true, schedule the fiber before all currently scheduled fibers. If the fiber is already scheduled 1372 (`getFiberState` returns `Scheduled`), this will move it to the top of the queue. 1373 */ 1374 @notrace void resumeFiber(FiberHandle handle, bool priority = false) nothrow @safe @nogc { 1375 auto fiber = handle.get(); 1376 1377 if( fiber !is null ) 1378 resumeFiber(handle.get(), priority); 1379 } 1380 1381 /** 1382 Suspend the current fiber 1383 1384 If `timeout` is given and expires, the suspend will throw a `TimeoutExpired` exception. 1385 1386 You are heartily encouraged not to use this function directly. In almost all cases, it is better to use one of the 1387 synchronization primitives instead. 1388 */ 1389 @notrace void suspendCurrentFiber(Timeout timeout) @trusted @nogc { 1390 if (timeout == Timeout.infinite) 1391 return suspendCurrentFiber(); 1392 1393 assertMayContextSwitch("suspendCurrentFiber"); 1394 1395 TimerHandle timeoutHandle; 1396 scope(exit) timeoutHandle.cancelTimer(); 1397 bool timeoutExpired; 1398 1399 if (timeout == Timeout.elapsed) { 1400 throw mkEx!TimeoutExpired; 1401 } 1402 1403 static void resumer(FiberHandle fibHandle, TimerHandle* cookie, bool* timeoutExpired) nothrow @safe @nogc{ 1404 *cookie = TimerHandle.init; 1405 ReactorFiber* fib = fibHandle.get; 1406 assert( fib !is null, "Fiber disappeared while suspended with timer" ); 1407 1408 // Throw TimeoutExpired only if we're the ones who resumed the fiber. this prevents a race when 1409 // someone else had already woken the fiber, but it just didn't get time to run while the timer expired. 1410 // this probably indicates fibers hogging the CPU for too long (starving others) 1411 *timeoutExpired = ! fib.flag!"SCHEDULED"; 1412 1413 /+ 1414 if (! *timeoutExpired) 1415 fib.WARN_AS!"#REACTOR fiber resumer invoked, but fiber already scheduled (starvation): %s scheduled, %s pending"( 1416 theReactor.scheduledFibers.length, theReactor.pendingFibers.length); 1417 +/ 1418 1419 theReactor.resumeFiber(fib); 1420 } 1421 1422 timeoutHandle = registerTimer!resumer(timeout, currentFiberHandle, &timeoutHandle, &timeoutExpired); 1423 switchToNext(); 1424 1425 if( timeoutExpired ) 1426 throw mkEx!TimeoutExpired(); 1427 } 1428 1429 /// ditto 1430 @notrace void suspendCurrentFiber() @safe @nogc { 1431 switchToNext(); 1432 } 1433 1434 /** 1435 * run a function inside a different thread. 1436 * 1437 * Runs a function inside a thread. Use this function to run CPU bound processing without freezing the other fibers. 1438 * 1439 * `Fini`, if provided, will be called with the same parameters in the reactor thread once the thread has finished. 1440 * `Fini` will be called unconditionally, even if the fiber that launched the thread terminates before the thread 1441 * has finished. 1442 * 1443 * The `Fini` function runs within a `criticalSection`, and must not sleep. 1444 * 1445 * Returns: 1446 * The return argument from the delegate is returned by this function. 1447 * 1448 * Throws: 1449 * Will throw TimeoutExpired if the timeout expires. 1450 * 1451 * Will rethrow whatever the thread throws in the waiting fiber. 1452 */ 1453 auto deferToThread(alias F, alias Fini = null)(Parameters!F args, Timeout timeout = Timeout.infinite) @nogc { 1454 DBG_ASSERT!"deferToThread called but thread deferral isn't enabled in the reactor"( 1455 optionsInEffect.threadDeferralEnabled); 1456 return threadPool.deferToThread!(F, Fini)(timeout, args); 1457 } 1458 1459 /// ditto 1460 auto deferToThread(F)(scope F dlg, Timeout timeout = Timeout.infinite) @nogc { 1461 DBG_ASSERT!"deferToThread called but thread deferral isn't enabled in the reactor"(optionsInEffect.threadDeferralEnabled); 1462 static auto glueFunction(F dlg) { 1463 return dlg(); 1464 } 1465 1466 return threadPool.deferToThread!glueFunction(timeout, dlg); 1467 } 1468 1469 /** 1470 * forward an exception to another fiber 1471 * 1472 * This function throws an exception in another fiber. The fiber will be scheduled to run. 1473 * 1474 * There is a difference in semantics between the two forms. The first form throws an identical copy of the exception in the 1475 * target fiber. The second form forms a new exception to be thrown in that fiber. 1476 * 1477 * One place where this difference matters is with the stack trace the thrown exception will have. With the first form, the 1478 * stack trace will be the stack trace `ex` has, wherever it is (usually not in the fiber in which the exception was thrown). 1479 * With the second form, the stack trace will be of the target fiber, which means it will point back to where the fiber went 1480 * to sleep. 1481 */ 1482 bool throwInFiber(FiberHandle fHandle, Throwable ex) nothrow @safe @nogc { 1483 if( !fHandle.isValid ) 1484 return false; 1485 1486 ExcBuf* fiberEx = prepThrowInFiber(fHandle, false); 1487 1488 if( fiberEx is null ) 1489 return false; 1490 1491 fiberEx.set(ex); 1492 auto fib = fHandle.get(); 1493 resumeFiber(fib, true); 1494 return true; 1495 } 1496 1497 /// ditto 1498 bool throwInFiber(T : Throwable, string file = __FILE_FULL_PATH__, size_t line = __LINE__, A...) 1499 (FiberHandle fHandle, auto ref A args) nothrow @safe @nogc 1500 { 1501 pragma(inline, true); 1502 if( !fHandle.isValid ) 1503 return false; 1504 1505 ExcBuf* fiberEx = prepThrowInFiber(fHandle, true); 1506 1507 if( fiberEx is null ) 1508 return false; 1509 1510 fiberEx.construct!T(file, line, false, args); 1511 auto fib = fHandle.get(); 1512 resumeFiber(fib, true); 1513 return true; 1514 } 1515 1516 /** Request that a GC collection take place ASAP 1517 * Params: 1518 * waitForCollection = whether to wait for the collection to finish before returning. 1519 */ 1520 void requestGCCollection(bool waitForCollection = true) @safe @nogc { 1521 if( theReactor._stopping ) 1522 return; 1523 1524 DEBUG!"Requesting explicit GC collection"(); 1525 requestGCCollectionInternal(true); 1526 1527 if( waitForCollection ) { 1528 DBG_ASSERT!"Special fiber must request synchronous GC collection"( !isSpecialFiber ); 1529 yield(); 1530 } 1531 } 1532 1533 private void requestGCCollectionInternal(bool force) nothrow @safe @nogc { 1534 _gcCollectionNeeded = true; 1535 _gcCollectionForce = force; 1536 if( theReactor.currentFiberId != MainFiberId ) { 1537 theReactor.resumeSpecialFiber(theReactor.mainFiber); 1538 } 1539 } 1540 1541 /// Property for disabling/enabling the hang detector. 1542 /// 1543 /// The hang detector must be configured during `setup` by setting `OpenOptions.hangDetectorTimeout`. 1544 @property bool hangDetectorEnabled() pure const nothrow @safe @nogc { 1545 return _hangDetectorEnabled; 1546 } 1547 1548 /// ditto 1549 @property void hangDetectorEnabled(bool enabled) pure nothrow @safe @nogc { 1550 ASSERT!"Cannot enable an unconfigured hang detector"( 1551 !enabled || optionsInEffect.hangDetectorTimeout !is Duration.zero); 1552 _hangDetectorEnabled = enabled; 1553 } 1554 1555 /// Iterate all fibers 1556 auto iterateFibers() const nothrow @safe @nogc { 1557 static struct FibersIterator { 1558 private: 1559 uint numLivingFibers; 1560 FiberIdx idx = NUM_SPECIAL_FIBERS; 1561 ReturnType!(Reactor.criticalSection) criticalSection; 1562 1563 this(uint numFibers) { 1564 numLivingFibers = numFibers; 1565 this.criticalSection = theReactor.criticalSection(); 1566 1567 if( numLivingFibers>0 ) { 1568 findNextFiber(); 1569 } 1570 } 1571 1572 void findNextFiber() @safe @nogc nothrow { 1573 while( ! to!(ReactorFiber*)(idx).isAlive ) { 1574 idx++; 1575 DBG_ASSERT!"Asked for next living fiber but none was found"( 1576 idx<theReactor.optionsInEffect.numFibers); 1577 } 1578 } 1579 public: 1580 @property bool empty() const pure @safe @nogc nothrow { 1581 return numLivingFibers==0; 1582 } 1583 1584 void popFront() @safe @nogc nothrow { 1585 ASSERT!"Popping fiber from empty list"(!empty); 1586 numLivingFibers--; 1587 idx++; 1588 1589 if( !empty ) 1590 findNextFiber; 1591 } 1592 1593 @property FiberHandle front() @safe @nogc nothrow { 1594 auto fib = &theReactor.allFibers[idx.value]; 1595 DBG_ASSERT!"Scanned fiber %s is not alive"(fib.isAlive, idx); 1596 1597 return FiberHandle(fib); 1598 } 1599 } 1600 1601 return FibersIterator(cast(uint) reactorStats.numUsedFibers); 1602 } 1603 1604 auto iterateScheduledFibers(FiberPriorities priority) nothrow @safe @nogc { 1605 @notrace static struct Range { 1606 private typeof(scheduledFibersNormal.range()) fibersRange; 1607 1608 @property FiberHandle front() nothrow @nogc { 1609 return FiberHandle(fibersRange.front); 1610 } 1611 1612 @property bool empty() const pure nothrow @nogc { 1613 return fibersRange.empty; 1614 } 1615 1616 @notrace void popFront() nothrow { 1617 fibersRange.popFront(); 1618 } 1619 } 1620 1621 with(FiberPriorities) final switch(priority) { 1622 case NORMAL: 1623 return scheduledFibersNormal.range(); 1624 case HIGH: 1625 return scheduledFibersHigh.range(); 1626 case IMMEDIATE: 1627 return scheduledFibersImmediate.range(); 1628 } 1629 1630 assert(false, "Priority must be member of FiberPriorities"); 1631 } 1632 1633 /** 1634 * Set a fiber name 1635 * 1636 * Used by certain diagnostic functions to distinguish the different fiber types and create histograms. 1637 * Arguments bear no specific meaning, but default to describing the function used to start the fiber. 1638 */ 1639 @notrace void setFiberName(FiberHandle fh, string name, void *ptr) nothrow @safe @nogc { 1640 DBG_ASSERT!"Trying to set fiber name of an invalid fiber"(fh.isValid); 1641 setFiberName(fh.get, name, ptr); 1642 } 1643 1644 /// ditto 1645 @notrace void setFiberName(T)(FiberHandle fh, string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) { 1646 setFiberName(fh, name, dlg.ptr); 1647 } 1648 1649 /** 1650 * Temporarily change the fiber's name 1651 * 1652 * Meaning of arguments is as for `setFiberName`. 1653 * 1654 * returns: 1655 * A voldemort type whose destructor returns the fiber name to the one it had before. It also has a `release` 1656 * function for returning the name earlier. 1657 */ 1658 auto pushFiberName(string name, void *ptr) nothrow @safe @nogc { 1659 static struct PrevName { 1660 private: 1661 string name; 1662 void* ptr; 1663 1664 public: 1665 @disable this(); 1666 @disable this(this); 1667 1668 this(string name, void* ptr) nothrow @safe @nogc { 1669 this.name = name; 1670 this.ptr = ptr; 1671 } 1672 1673 ~this() nothrow @safe @nogc { 1674 if( name !is null || ptr !is null ) 1675 release(); 1676 } 1677 1678 void release() nothrow @safe @nogc { 1679 theReactor.setFiberName( theReactor.thisFiber, this.name, this.ptr ); 1680 1681 name = null; 1682 ptr = null; 1683 } 1684 } 1685 1686 auto prevName = PrevName(name, ptr); 1687 setFiberName( theReactor.thisFiber, name, ptr ); 1688 1689 import std.algorithm: move; 1690 return move(prevName); 1691 } 1692 1693 /// ditto 1694 @notrace auto pushFiberName(T)(string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) { 1695 return pushFiberName(name, dlg.ptr); 1696 } 1697 1698 /// Retrieve the fiber name set by `setFiberName` 1699 @notrace string getFiberName(FiberHandle fh) nothrow @safe @nogc { 1700 DBG_ASSERT!"Trying to get fiber name of an invalid fiber"(fh.isValid); 1701 return fh.get.params.fiberName; 1702 } 1703 1704 /// ditto 1705 @notrace string getFiberName() nothrow @safe @nogc { 1706 return thisFiber.params.fiberName; 1707 } 1708 1709 /// Retrieve the fiber pointer set by `setFiberName` 1710 @notrace void* getFiberPtr(FiberHandle fh) nothrow @safe @nogc { 1711 DBG_ASSERT!"Trying to get fiber pointer of an invalid fiber"(fh.isValid); 1712 return fh.get.params.fiberPtr; 1713 } 1714 1715 /// ditto 1716 @notrace void* getFiberPtr() nothrow @safe @nogc { 1717 return thisFiber.params.fiberPtr; 1718 } 1719 1720 /** 1721 * Wait until given fiber finishes 1722 */ 1723 @notrace void joinFiber(FiberHandle fh, Timeout timeout = Timeout.infinite) @safe @nogc { 1724 ReactorFiber* fiber = fh.get(); 1725 1726 if( fiber is null ) { 1727 assertMayContextSwitch(); 1728 return; 1729 } 1730 1731 fiber.params.joinWaiters.wait(timeout); 1732 DBG_ASSERT!"Fiber handle %s is valid after signalling done"(!fh.isValid, fh); 1733 } 1734 1735 /// Report the current reactor statistics. 1736 @property Stats reactorStats() const nothrow @safe @nogc { 1737 Stats ret = stats; 1738 1739 foreach( FiberIdx.UnderlyingType i; 0..NUM_SPECIAL_FIBERS ) { 1740 auto fiberIdx = FiberIdx(i); 1741 auto fiber = to!(ReactorFiber*)(fiberIdx); 1742 DBG_ASSERT!"%s is in state %s with histogram of 0"( 1743 ret.fibersHistogram[fiber.state]>0, fiber.identity, fiber.state ); 1744 ret.fibersHistogram[fiber.state]--; 1745 } 1746 1747 return ret; 1748 } 1749 1750 private: 1751 package @property inout(ReactorFiber)* thisFiber() inout nothrow pure @safe @nogc { 1752 DBG_ASSERT!"No current fiber as reactor was not started"(isRunning); 1753 return _thisFiber; 1754 } 1755 1756 @notrace TimedCallback* allocTimedCallback() nothrow @safe @nogc { 1757 DBG_ASSERT!"Registering timer on non-open reactor"(isOpen); 1758 auto ret = timedCallbacksPool.alloc(); 1759 ret.generation = timedCallbackGeneration.getNext(); 1760 1761 return ret; 1762 } 1763 1764 TimedCallback* allocRecurringTimer(Duration interval) nothrow @safe @nogc { 1765 TimedCallback* callback = allocTimedCallback(); 1766 if( interval<optionsInEffect.timerGranularity ) 1767 interval = optionsInEffect.timerGranularity; 1768 1769 callback.intervalCycles = TscTimePoint.toCycles(interval); 1770 return callback; 1771 } 1772 1773 TimedCallback* _registerRecurringTimer(Duration interval) nothrow @safe @nogc { 1774 TimedCallback* callback = allocRecurringTimer(interval); 1775 rescheduleRecurringTimer(callback); 1776 return callback; 1777 } 1778 1779 @property bool shouldRunTimedCallbacks() nothrow @safe @nogc { 1780 return timeQueue.cyclesTillNextEntry(TscTimePoint.hardNow()) == 0; 1781 } 1782 1783 void switchToNext() @safe @nogc { 1784 //DEBUG!"SWITCH out of %s"(thisFiber.identity); 1785 assertMayContextSwitch("Context switch"); 1786 1787 stats.numContextSwitches++; 1788 1789 // in source fiber 1790 { 1791 auto now = TscTimePoint.hardNow; 1792 if( !thisFiber.flag!"SPECIAL" ) { 1793 auto fiberRunTime = now - fiberRunStartTime; 1794 if( fiberRunTime >= optionsInEffect.hoggerWarningThreshold ) { 1795 WARN!"#HOGGER detected: Fiber %s ran for %sms"(thisFiber.identity, fiberRunTime.total!"msecs"); 1796 // TODO: Add dumping of stack trace 1797 } 1798 } 1799 1800 fiberRunStartTime = now; 1801 1802 if (thisFiber !is mainFiber && !mainFiber.flag!"SCHEDULED" && shouldRunTimedCallbacks()) { 1803 resumeSpecialFiber(mainFiber); 1804 } 1805 1806 ReactorFiber* nextFiber; 1807 1808 if( !scheduledFibersImmediate.empty ) { 1809 nextFiber = scheduledFibersImmediate.popHead(); 1810 } else if( 1811 !scheduledFibersHigh.empty && 1812 ( highPrioritySchedules<HIGH_PRIORITY_SCHEDULES_RATIO || scheduledFibersNormal.empty)) 1813 { 1814 nextFiber = scheduledFibersHigh.popHead(); 1815 highPrioritySchedules++; 1816 } else if( !scheduledFibersNormal.empty ) { 1817 nextFiber = scheduledFibersNormal.popHead(); 1818 if( !scheduledFibersHigh.empty ) { 1819 ERROR!"Scheduled normal priority fiber %s over high priority %s to prevent starvation"( 1820 nextFiber.identity, scheduledFibersHigh.head.identity); 1821 } 1822 1823 highPrioritySchedules = 0; 1824 } else { 1825 DBG_ASSERT!"Idle fiber scheduled but all queues empty"( !idleFiber.flag!"SCHEDULED" ); 1826 nextFiber = idleFiber; 1827 idleFiber.flag!"SCHEDULED" = true; 1828 nothingScheduled = true; 1829 } 1830 1831 if( thisFiber.state==FiberState.Running ) 1832 thisFiber.state = FiberState.Sleeping; 1833 else { 1834 assertEQ( thisFiber.state, FiberState.Done, "Fiber is in incorrect state" ); 1835 thisFiber.state = FiberState.None; 1836 } 1837 1838 DBG_ASSERT!"Couldn't decide on a fiber to schedule"(nextFiber !is null); 1839 1840 ASSERT!"Next fiber %s is not marked scheduled" (nextFiber.flag!"SCHEDULED", nextFiber.identity); 1841 nextFiber.flag!"SCHEDULED" = false; 1842 DBG_ASSERT!"%s is in state %s, should be Sleeping or Starting"( 1843 nextFiber.state==FiberState.Sleeping || nextFiber.state==FiberState.Starting, 1844 nextFiber.identity, nextFiber.state); 1845 nextFiber.state = FiberState.Running; 1846 1847 // DEBUG!"Switching %s => %s"(thisFiber.identity, nextFiber.identity); 1848 if (thisFiber !is nextFiber) { 1849 // make the switch 1850 switchTo(nextFiber); 1851 } 1852 } 1853 } 1854 1855 void switchTo(ReactorFiber* nextFiber) @safe @nogc { 1856 auto currentFiber = thisFiber; 1857 _thisFiber = nextFiber; 1858 1859 currentFiber.switchTo(nextFiber); 1860 1861 // After returning 1862 /+ 1863 Important note: 1864 Any code you place here *must* be replicated at the beginning of ReactorFiber.wrapper. Fibers launched 1865 for the first time do not return from `switchTo` above. 1866 +/ 1867 1868 // DEBUG!"SWITCH into %s"(thisFiber.identity); 1869 1870 // This might throw, so it needs to be the last thing we do 1871 thisFiber.switchInto(); 1872 } 1873 1874 bool fiberTerminated() nothrow @notrace { 1875 ASSERT!"special fibers must never terminate" (!thisFiber.flag!"SPECIAL"); 1876 1877 freeFibers.prepend(thisFiber); 1878 1879 bool skipBody = false; 1880 try { 1881 // Wait for next incarnation of fiber 1882 switchToNext(); 1883 } catch (FiberInterrupt ex) { 1884 INFO!"Fiber %s killed by FiberInterrupt exception %s"(currentFiberId, ex.msg); 1885 skipBody = true; 1886 } catch (Throwable ex) { 1887 ERROR!"switchToNext on %s fiber %s failed with exception %s"( 1888 thisFiber.state==FiberState.Running ? "just starting" : "dead", currentFiberId, ex.msg); 1889 theReactor.forwardExceptionToMain(ex); 1890 assert(false); 1891 } 1892 1893 return skipBody; 1894 } 1895 1896 void resumeSpecialFiber(ReactorFiber* fib) nothrow @safe @nogc { 1897 DBG_ASSERT!"Asked to resume special fiber %s which isn't marked special" (fib.flag!"SPECIAL", fib.identity); 1898 DBG_ASSERT!"Asked to resume special fiber %s with no body set" (fib.flag!"CALLBACK_SET", fib.identity); 1899 DBG_ASSERT!"Special fiber %s scheduled not in head of list" ( 1900 !fib.flag!"SCHEDULED" || scheduledFibersImmediate.head is fib, fib.identity); 1901 1902 if (!fib.flag!"SCHEDULED") { 1903 fib.flag!"SCHEDULED" = true; 1904 scheduledFibersImmediate.prepend(fib); 1905 nothingScheduled = false; 1906 } 1907 } 1908 1909 void resumeFiber(ReactorFiber* fib, bool immediate = false) nothrow @safe @nogc { 1910 DBG_ASSERT!"Cannot resume a special fiber %s using the standard resumeFiber" (!fib.flag!"SPECIAL", fib.identity); 1911 ASSERT!"resumeFiber called on %s, which does not have a callback set"(fib.flag!"CALLBACK_SET", fib.identity); 1912 1913 typeof(scheduledFibersNormal)* queue; 1914 if( immediate ) { 1915 if( fib.flag!"SCHEDULED" && fib !in scheduledFibersImmediate ) { 1916 fib._owner.remove(fib); 1917 fib.flag!"SCHEDULED" = false; 1918 } 1919 1920 queue = &scheduledFibersImmediate; 1921 } else if( fib.flag!"PRIORITY" ) { 1922 queue = &scheduledFibersHigh; 1923 } else { 1924 queue = &scheduledFibersNormal; 1925 } 1926 1927 if (!fib.flag!"SCHEDULED") { 1928 if (fib._owner !is null) { 1929 // Whatever this fiber was waiting to do, it is no longer what it needs to be doing 1930 fib._owner.remove(fib); 1931 } 1932 fib.flag!"SCHEDULED" = true; 1933 queue.append(fib); 1934 nothingScheduled = false; 1935 } 1936 } 1937 1938 ReactorFiber* _spawnFiber(bool immediate) nothrow @safe @nogc { 1939 ASSERT!"No more free fibers in pool"(!freeFibers.empty); 1940 auto fib = freeFibers.popHead(); 1941 assert (!fib.flag!"CALLBACK_SET"); 1942 fib.flag!"CALLBACK_SET" = true; 1943 fib.state = FiberState.Starting; 1944 fib._prevId = FiberIdx.invalid; 1945 fib._nextId = FiberIdx.invalid; 1946 fib._owner = null; 1947 fib.params.flsBlock.reset(); 1948 resumeFiber(fib, immediate); 1949 return fib; 1950 } 1951 1952 void idleLoop() { 1953 while (true) { 1954 TscTimePoint start, end; 1955 end = start = TscTimePoint.hardNow; 1956 1957 while (nothingScheduled) { 1958 auto critSect = criticalSection(); 1959 1960 /* 1961 Since we've updated "end" before calling the timers, these timers won't count as idle time, unless.... 1962 after running them the scheduledFibers list is still empty, in which case they do. 1963 */ 1964 if( runTimedCallbacks(end) ) 1965 continue; 1966 1967 // We only reach here if runTimedCallbacks did nothing, in which case "end" is recent enough 1968 Duration sleepDuration = timeQueue.timeTillNextEntry(end); 1969 bool countsAsIdle = true; 1970 if( actualIdleCallbacks.length==1 ) { 1971 countsAsIdle = actualIdleCallbacks[0](sleepDuration) && countsAsIdle; 1972 } else if ( actualIdleCallbacks.length>1 ) { 1973 foreach(cb; actualIdleCallbacks) { 1974 with( pushFiberName("Idle callback", cb) ) { 1975 countsAsIdle = cb(ZERO_DURATION) && countsAsIdle; 1976 } 1977 } 1978 } else { 1979 //DEBUG!"Idle fiber called with no callbacks, sleeping %sus"(sleepDuration.total!"usecs"); 1980 import core.thread; Thread.sleep(sleepDuration); 1981 } 1982 1983 if( countsAsIdle ) { 1984 end = TscTimePoint.hardNow; 1985 } else { 1986 if( nothingScheduled ) { 1987 // We are going in for another round, but this round should not count as idle time 1988 stats.idleCycles += end.diff!"cycles"(start); 1989 end = start = TscTimePoint.hardNow; 1990 } 1991 } 1992 } 1993 stats.idleCycles += end.diff!"cycles"(start); 1994 switchToNext(); 1995 } 1996 } 1997 1998 @notrace bool runTimedCallbacks(TscTimePoint now = TscTimePoint.hardNow) { 1999 // Timer callbacks are not allowed to sleep 2000 auto criticalSectionContainer = criticalSection(); 2001 2002 bool ret; 2003 2004 TimedCallback* callback; 2005 while ((callback = timeQueue.pop(now)) !is null) { 2006 callback.closure(); 2007 if( callback.intervalCycles==0 ) 2008 timedCallbacksPool.release(callback); 2009 else 2010 rescheduleRecurringTimer(callback); 2011 2012 ret = true; 2013 } 2014 2015 return ret; 2016 } 2017 2018 void rescheduleRecurringTimer(TimedCallback* callback) nothrow @safe @nogc { 2019 ulong cycles = TscTimePoint.hardNow.cycles + callback.intervalCycles; 2020 cycles -= cycles % callback.intervalCycles; 2021 callback.timePoint = TscTimePoint(cycles); 2022 2023 timeQueue.insert(callback); 2024 } 2025 2026 ExcBuf* prepThrowInFiber(FiberHandle fHandle, bool updateBT, bool specialOkay = false) nothrow @safe @nogc { 2027 ReactorFiber* fib = fHandle.get(); 2028 ASSERT!"Cannot throw in the reactor's own fibers"( !fib.flag!"SPECIAL" || specialOkay ); 2029 if( fib is null ) { 2030 WARN!"Failed to throw exception in fiber %s which is no longer valid"(fHandle); 2031 return null; 2032 } 2033 2034 if( fib.flag!"HAS_EXCEPTION" ) { 2035 ERROR!"Tried to throw exception in fiber %s which already has an exception pending"(fHandle); 2036 return null; 2037 } 2038 2039 fib.flag!"HAS_EXCEPTION" = true; 2040 fib.flag!"EXCEPTION_BT" = updateBT; 2041 return &fib.params.currExcBuf; 2042 } 2043 2044 void forwardExceptionToMain(Throwable ex) nothrow @trusted @nogc { 2045 ExcBuf* fiberEx = prepThrowInFiber(FiberHandle(mainFiber), false, true); 2046 2047 if( fiberEx is null ) 2048 return; 2049 2050 fiberEx.set(ex); 2051 if( !mainFiber.flag!"SPECIAL" ) { 2052 ASSERT!"Main fiber not marked as special but reactor is not stopping"( _stopping ); 2053 mainFiber.flag!"SPECIAL" = true; 2054 } 2055 resumeSpecialFiber(mainFiber); 2056 as!"nothrow"(&theReactor.switchToNext); 2057 assert(false, "switchToNext on dead system returned"); 2058 } 2059 2060 void registerHangDetector() @trusted @nogc { 2061 DBG_ASSERT!"registerHangDetector called twice"(!hangDetectorTimer.isSet); 2062 hangDetectorTimer.start(); 2063 } 2064 2065 void deregisterHangDetector() nothrow @trusted @nogc { 2066 hangDetectorTimer.cancel(); 2067 } 2068 2069 extern(C) static void hangDetectorHandler() nothrow @trusted @nogc { 2070 if( !theReactor._hangDetectorEnabled ) 2071 return; 2072 2073 auto now = TscTimePoint.hardNow(); 2074 auto delay = now - theReactor.fiberRunStartTime; 2075 2076 if( delay<theReactor.optionsInEffect.hangDetectorTimeout || theReactor.currentFiberId == IdleFiberId ) 2077 return; 2078 2079 long seconds, usecs; 2080 delay.split!("seconds", "usecs")(seconds, usecs); 2081 2082 ERROR!"Hang detector triggered for %s after %s.%06s seconds"(theReactor.currentFiberId, seconds, usecs); 2083 dumpStackTrace(); 2084 2085 ABORT("Hang detector killed process"); 2086 } 2087 2088 void registerFaultHandlers() @trusted @nogc { 2089 posix_signal.sigaction_t action; 2090 action.sa_sigaction = &faultHandler; 2091 action.sa_flags = posix_signal.SA_SIGINFO | posix_signal.SA_RESETHAND | posix_signal.SA_ONSTACK; 2092 2093 errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGSEGV, &action, null)==0, "Failed to register SIGSEGV handler" ); 2094 errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGILL, &action, null)==0, "Failed to register SIGILL handler" ); 2095 errnoEnforceNGC( posix_signal.sigaction(OSSignal.SIGBUS, &action, null)==0, "Failed to register SIGBUS handler" ); 2096 } 2097 2098 void deregisterFaultHandlers() nothrow @trusted @nogc { 2099 posix_signal.signal(OSSignal.SIGBUS, posix_signal.SIG_DFL); 2100 posix_signal.signal(OSSignal.SIGILL, posix_signal.SIG_DFL); 2101 posix_signal.signal(OSSignal.SIGSEGV, posix_signal.SIG_DFL); 2102 } 2103 2104 @notrace extern(C) static void faultHandler(int signum, siginfo_t* info, void* ctx) nothrow @trusted @nogc { 2105 OSSignal sig = cast(OSSignal)signum; 2106 string faultName; 2107 2108 switch(sig) { 2109 case OSSignal.SIGSEGV: 2110 faultName = "Segmentation fault"; 2111 break; 2112 case OSSignal.SIGILL: 2113 faultName = "Illegal instruction"; 2114 break; 2115 case OSSignal.SIGBUS: 2116 faultName = "Bus error"; 2117 break; 2118 default: 2119 faultName = "Unknown fault"; 2120 break; 2121 } 2122 2123 META!"#OSSIGNAL %s"(faultName); 2124 dumpStackTrace(); 2125 flushLog(); // There is a certain chance the following lines themselves fault. Flush the logs now so that we have something 2126 2127 Ucontext* contextPtr = cast(Ucontext*)ctx; 2128 auto pc = contextPtr ? contextPtr.uc_mcontext.registers.rip : 0; 2129 2130 if( isReactorThread ) { 2131 auto currentSD = theReactor.getCurrentFiberPtr(true).stackDescriptor; 2132 ERROR!"%s on %s address 0x%x, PC 0x%x stack params at 0x%x"( 2133 faultName, theReactor.currentFiberId, info.si_addr, pc, theReactor.getCurrentFiberPtr(true).params); 2134 ERROR!"Stack is at [%s .. %s]"( currentSD.bstack, currentSD.tstack ); 2135 auto guardAddrStart = currentSD.bstack - GUARD_ZONE_SIZE; 2136 if( info.si_addr < currentSD.bstack && info.si_addr >= guardAddrStart ) { 2137 ERROR!"Hit stack guard area"(); 2138 } 2139 } else { 2140 ERROR!"%s on OS thread at address %s, PC %s"(faultName, info.si_addr, pc); 2141 } 2142 flushLog(); 2143 2144 // Exit the fault handler, which will re-execute the offending instruction. Since we registered as run once, the default handler 2145 // will then kill the node. 2146 } 2147 2148 void mainloop() { 2149 assert (isOpen); 2150 assert (!isRunning); 2151 assert (_thisFiber is null); 2152 2153 _running = true; 2154 scope(exit) _running = false; 2155 2156 GC.disable(); 2157 scope(exit) GC.enable(); 2158 2159 lastGCStats = GC.stats(); 2160 2161 if( optionsInEffect.utGcDisabled ) { 2162 // GC is disabled during the reactor run. Run it before we start 2163 GC.collect(); 2164 } 2165 2166 // Don't register the hang detector until after we've finished running the GC 2167 if( optionsInEffect.hangDetectorTimeout !is Duration.zero ) { 2168 registerHangDetector(); 2169 _hangDetectorEnabled = true; 2170 } else { 2171 _hangDetectorEnabled = false; 2172 } 2173 2174 scope(exit) { 2175 if( optionsInEffect.hangDetectorTimeout !is Duration.zero ) 2176 deregisterHangDetector(); 2177 2178 _hangDetectorEnabled = false; 2179 } 2180 2181 _thisFiber = mainFiber; 2182 scope(exit) _thisFiber = null; 2183 2184 if( !optionsInEffect.utGcDisabled ) 2185 TimerHandle gcTimer = registerRecurringTimer!requestGCCollectionInternal(optionsInEffect.gcInterval, false); 2186 2187 try { 2188 while (!_stopping) { 2189 DBG_ASSERT!"Switched to mainloop with wrong thisFiber %s"(thisFiber is mainFiber, thisFiber.identity); 2190 runTimedCallbacks(); 2191 if( _gcCollectionNeeded ) 2192 gcCollect(); 2193 2194 if( !_stopping ) 2195 switchToNext(); 2196 } 2197 } catch( ReactorExit ex ) { 2198 ASSERT!"Main loop threw ReactorExit, but reactor is not stopping"(_stopping); 2199 } 2200 2201 performStopReactor(); 2202 } 2203 2204 void gcCollect() { 2205 _gcCollectionNeeded = false; 2206 auto statsBefore = GC.stats(); 2207 2208 if( 2209 _gcCollectionForce || 2210 optionsInEffect.gcRunThreshold==0 || 2211 statsBefore.usedSize > lastGCStats.usedSize+optionsInEffect.gcRunThreshold ) 2212 { 2213 TscTimePoint.hardNow(); // Update the hard now value 2214 DEBUG!"#GC collection cycle started, %s bytes allocated since last run (forced %s)"(statsBefore.usedSize - lastGCStats.usedSize, _gcCollectionForce); 2215 2216 GC.collect(); 2217 TscTimePoint.hardNow(); // Update the hard now value 2218 lastGCStats = GC.stats(); 2219 DEBUG!"#GC collection cycle ended, freed %s bytes"(statsBefore.usedSize - lastGCStats.usedSize); 2220 2221 _gcCollectionForce = false; 2222 } 2223 } 2224 2225 void performStopReactor() @nogc { 2226 ASSERT!"performStopReactor must be called from the main fiber. Use Reactor.stop instead"( isMain ); 2227 2228 Throwable reactorExit = mkEx!ReactorExit("Reactor is quitting"); 2229 foreach(ref fiber; allFibers[1..$]) { // All fibers but main 2230 if( fiber.isAlive ) { 2231 fiber.flag!"SPECIAL" = false; 2232 throwInFiber(FiberHandle(&fiber), reactorExit); 2233 } 2234 } 2235 2236 thisFiber.flag!"SPECIAL" = false; 2237 yield(); 2238 2239 META!"Stopping reactor"(); 2240 _stopping = false; 2241 } 2242 2243 @notrace void setFiberName(ReactorFiber* fib, string name, void *ptr) nothrow @safe @nogc { 2244 fib.params.fiberName = name; 2245 fib.params.fiberPtr = ptr; 2246 } 2247 2248 @notrace void setFiberName(T)(ReactorFiber* fib, string name, scope T dlg) nothrow @safe @nogc if( isDelegate!T ) { 2249 fib.params.fiberName = name; 2250 fib.params.fiberPtr = dlg.ptr; 2251 } 2252 2253 void performCrossFiberHook() @safe @nogc { 2254 ReactorFiber* callingFiber; 2255 2256 // Perform the hook under critical section 2257 with(criticalSection()) { 2258 scope(failure) ASSERT!"Cross fiber hook threw"(false); 2259 crossFiberHook(); 2260 2261 callingFiber = crossFiberHookCaller.get(); 2262 ASSERT!"Fiber invoking hook %s invalidated by hook"(callingFiber !is null, crossFiberHookCaller.identity); 2263 2264 // Clear the hooks so they don't get called when returning to the hooker fiber 2265 crossFiberHook = null; 2266 crossFiberHookCaller.reset(); 2267 } 2268 2269 switchTo(callingFiber); 2270 } 2271 2272 @notrace void callInFiber(FiberHandle fh, scope void delegate() nothrow @safe @nogc callback) @trusted @nogc { 2273 ASSERT!"Cannot set hook when one is already set"( crossFiberHook is null && !crossFiberHookCaller.isSet ); 2274 ReactorFiber* fib = fh.get(); 2275 if( fib is null ) 2276 // Fiber isn't valid - don't do anything 2277 return; 2278 2279 if( fib is thisFiber ) { 2280 // We were asked to switch to ourselves. Just run the callback 2281 auto critSect = criticalSection(); 2282 callback(); 2283 2284 return; 2285 } 2286 2287 with(FiberState) { 2288 ASSERT!"Trying to dump stack trace of %s which is in invalid state %s"( 2289 fib.state==Starting || fib.state==Sleeping, fib.identity, fib.state ); 2290 } 2291 2292 // We are storing a scoped delegate inside a long living pointer, but we make sure to finish using it before exiting. 2293 crossFiberHook = callback; 2294 crossFiberHookCaller = FiberHandle(thisFiber); // Don't call currentFiber, as we might be a special fiber 2295 2296 switchTo(fib); 2297 2298 DBG_ASSERT!"crossFiberHookCaller not cleared after call"( !crossFiberHookCaller.isSet ); 2299 DBG_ASSERT!"crossFiberHook not cleared after call"( crossFiberHook is null ); 2300 } 2301 2302 import std..string : format; 2303 enum string decl_log_as(string logLevel) = q{ 2304 @notrace public void %1$s_AS( 2305 string fmt, string file = __FILE_FULL_PATH__, string mod = __MODULE__, int line = __LINE__, T...) 2306 (FiberHandle fh, T args) nothrow @safe @nogc 2307 { 2308 auto fiber = fh.get; 2309 if( fiber is null ) { 2310 ERROR!("Can't issue %1$s log as %%s. Original log: "~fmt, file, mod, line)(fh, args); 2311 return; 2312 } 2313 2314 auto currentFiber = getCurrentFiberPtr(true); 2315 fiber.logSwitchInto(); 2316 scope(exit) currentFiber.logSwitchInto(); 2317 2318 %1$s!(fmt, file, mod, line)(args); 2319 } 2320 }.format(logLevel); 2321 mixin(decl_log_as!"DEBUG"); 2322 mixin(decl_log_as!"INFO"); 2323 mixin(decl_log_as!"WARN"); 2324 mixin(decl_log_as!"ERROR"); 2325 mixin(decl_log_as!"META"); 2326 2327 /** 2328 * Log the stack trace of a given fiber. 2329 * 2330 * The fiber should be in a valid state. This is the equivalent to the fiber itself running `dumpStackTrace`. 2331 */ 2332 @notrace public void LOG_TRACEBACK_AS( 2333 FiberHandle fh, string text, string file = __FILE_FULL_PATH__, size_t line = __LINE__) @safe @nogc 2334 { 2335 callInFiber(fh, { 2336 dumpStackTrace(text, file, line); 2337 }); 2338 } 2339 } 2340 2341 // Expose the conversion to/from ReactorFiber only to the reactor package 2342 package ReactorFiber* to(T : ReactorFiber*)(FiberIdx fidx) nothrow @safe @nogc { 2343 if (!fidx.isValid) 2344 return null; 2345 2346 ASSERT!"Reactor is not open"( theReactor.isOpen ); 2347 return &theReactor.allFibers[fidx.value]; 2348 } 2349 2350 package FiberIdx to(T : FiberIdx)(const ReactorFiber* rfp) nothrow @safe @nogc { 2351 if (rfp is null) 2352 return FiberIdx.invalid; 2353 2354 ASSERT!"Reactor is not open"( theReactor.isOpen ); 2355 auto idx = rfp - &theReactor.allFibers.arr[0]; 2356 DBG_ASSERT!"Reactor fiber pointer not pointing to fibers pool: base %s ptr %s idx %s"( 2357 idx>=0 && idx<theReactor.allFibers.arr.length, &theReactor.allFibers.arr[0], rfp, idx); 2358 return FiberIdx( cast(ushort)idx ); 2359 } 2360 2361 package FiberIdx to(T : FiberIdx)(FiberId fiberId) nothrow @safe @nogc { 2362 return FiberIdx( fiberId.value & theReactor.maxNumFibersMask ); 2363 } 2364 2365 private __gshared Reactor _theReactor; 2366 private bool /* thread local */ _isReactorThread; 2367 2368 /** 2369 * return a reference to the Reactor singleton 2370 * 2371 * In theory, @safe code must not access global variables. Since theReactor is only meant to be used by a single thread, however, this 2372 * function is @trusted. If it were not, practically no code could be @safe. 2373 */ 2374 @property ref Reactor theReactor() nothrow @trusted @nogc { 2375 //DBG_ASSERT!"not main thread"(_isReactorThread); 2376 return _theReactor; 2377 } 2378 2379 /// Returns whether the current thread is the thread in which theReactor is running 2380 @property bool isReactorThread() nothrow @safe @nogc { 2381 return _isReactorThread; 2382 } 2383 2384 version (unittest) { 2385 /** 2386 * Run a test inside a reactor 2387 * 2388 * This is a convenience function for running a UT as a reactor fiber. A new reactor will be initialized, `dg` called 2389 * and the reactor will automatically stop when `dg` is done. 2390 */ 2391 int testWithReactor(int delegate() dg, Reactor.OpenOptions options = Reactor.OpenOptions.init) { 2392 sigset_t emptyMask; 2393 errnoEnforceNGC( sigprocmask( SIG_SETMASK, &emptyMask, null )==0, "sigprocmask failed" ); 2394 2395 theReactor.setup(options); 2396 scope(success) theReactor.teardown(); 2397 2398 bool delegateReturned = false; 2399 2400 void wrapper() { 2401 int ret; 2402 try { 2403 ret = dg(); 2404 2405 delegateReturned = true; 2406 } catch(ReactorExit ex) { 2407 LOG_EXCEPTION(ex); 2408 assert(false, "testWithReactor's body called theReactor.stop explicitly"); 2409 } catch(FiberInterrupt ex) { 2410 LOG_EXCEPTION(ex); 2411 theReactor.stop(); 2412 } catch(Throwable ex) { 2413 // No need to stop the reactor - the exception thrown will teminate it 2414 LOG_EXCEPTION(ex); 2415 ERROR!"Test terminated abnormally"(); 2416 throw ex; 2417 } 2418 2419 theReactor.stop( ret ); 2420 } 2421 2422 theReactor.spawnFiber(&wrapper); 2423 int ret = theReactor.start(); 2424 assert (delegateReturned, "testWithReactor called with a delegate that threw without returning"); 2425 2426 return ret; 2427 } 2428 2429 /// ditto 2430 void testWithReactor(void delegate() dg, Reactor.OpenOptions options = Reactor.OpenOptions.init) { 2431 int wrapper() { 2432 dg(); 2433 2434 return 0; 2435 } 2436 2437 testWithReactor(&wrapper, options); 2438 } 2439 2440 public import mecca.runtime.ut: mecca_ut; 2441 2442 mixin template TEST_FIXTURE_REACTOR(FIXTURE) { 2443 import mecca.runtime.ut: runFixtureTestCases; 2444 import mecca.reactor: testWithReactor, Reactor; 2445 unittest { 2446 Reactor.OpenOptions options; 2447 2448 static if( __traits(hasMember, FIXTURE, "reactorOptions") ) { 2449 options = FIXTURE.reactorOptions; 2450 } 2451 2452 testWithReactor({ 2453 try { 2454 runFixtureTestCases!(FIXTURE)(); 2455 } catch( Throwable ex ) { 2456 import mecca.log: LOG_EXCEPTION; 2457 import mecca.lib.exception: DIE; 2458 2459 LOG_EXCEPTION(ex); 2460 DIE("UT failed due to exception"); 2461 } 2462 }, options); 2463 } 2464 } 2465 } 2466 2467 2468 unittest { 2469 import std.stdio; 2470 2471 theReactor.setup(); 2472 scope(exit) theReactor.teardown(); 2473 2474 static void fibFunc(string name) { 2475 foreach(i; 0 .. 10) { 2476 writeln(name); 2477 theReactor.yield(); 2478 } 2479 theReactor.stop(); 2480 } 2481 2482 theReactor.spawnFiber(&fibFunc, "hello"); 2483 theReactor.spawnFiber(&fibFunc, "world"); 2484 theReactor.start(); 2485 } 2486 2487 unittest { 2488 // Test simple timeout 2489 import std.stdio; 2490 2491 theReactor.setup(); 2492 scope(exit) theReactor.teardown(); 2493 2494 uint counter; 2495 TscTimePoint start; 2496 2497 void fiberFunc(Duration duration) { 2498 INFO!"Fiber %s sleeping for %s"(theReactor.currentFiberHandle, duration.toString); 2499 theReactor.sleep(duration); 2500 auto now = TscTimePoint.hardNow; 2501 counter++; 2502 INFO!"Fiber %s woke up after %s, overshooting by %s counter is %s"(theReactor.currentFiberHandle, (now - start).toString, 2503 ((now-start) - duration).toString, counter); 2504 } 2505 2506 void ender() { 2507 INFO!"Fiber %s ender is sleeping for 250ms"(theReactor.currentFiberHandle); 2508 theReactor.sleep(dur!"msecs"(250)); 2509 INFO!"Fiber %s ender woke up"(theReactor.currentFiberHandle); 2510 2511 theReactor.stop(); 2512 } 2513 2514 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(10)); 2515 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(100)); 2516 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(150)); 2517 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(20)); 2518 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(30)); 2519 theReactor.spawnFiber(&fiberFunc, dur!"msecs"(200)); 2520 theReactor.spawnFiber(&ender); 2521 2522 start = TscTimePoint.hardNow; 2523 theReactor.start(); 2524 auto end = TscTimePoint.hardNow; 2525 INFO!"UT finished in %s"((end - start).toString); 2526 2527 assert(counter == 6, "Not all fibers finished"); 2528 } 2529 2530 unittest { 2531 // Test suspending timeout 2532 import std.stdio; 2533 2534 theReactor.setup(); 2535 scope(exit) theReactor.teardown(); 2536 2537 void fiberFunc() { 2538 bool thrown; 2539 2540 try { 2541 theReactor.suspendCurrentFiber( Timeout(dur!"msecs"(4)) ); 2542 } catch(TimeoutExpired ex) { 2543 thrown = true; 2544 } 2545 2546 assert(thrown); 2547 2548 theReactor.stop(); 2549 } 2550 2551 theReactor.spawnFiber(&fiberFunc); 2552 theReactor.start(); 2553 } 2554 2555 unittest { 2556 // Test suspending timeout 2557 import std.stdio; 2558 2559 // GC running during the test mess with the timing 2560 Reactor.OpenOptions oo; 2561 oo.utGcDisabled = true; 2562 2563 theReactor.setup(oo); 2564 scope(exit) theReactor.teardown(); 2565 2566 void fiberFunc() { 2567 TimerHandle[8] handles; 2568 Duration[8] timeouts = [ 2569 dur!"msecs"(2), 2570 dur!"msecs"(200), 2571 dur!"msecs"(6), 2572 dur!"msecs"(120), 2573 dur!"msecs"(37), 2574 dur!"msecs"(40), 2575 dur!"msecs"(133), 2576 dur!"msecs"(8), 2577 ]; 2578 2579 ubyte a; 2580 2581 static void timer(ubyte* a, TimerHandle* handle, ubyte bit) { 2582 (*a) |= 1<<bit; 2583 2584 (*handle) = TimerHandle.init; 2585 } 2586 2587 foreach(ubyte i, duration; timeouts) { 2588 handles[i] = theReactor.registerTimer!timer( Timeout(duration), &a, &handles[i], i ); 2589 } 2590 2591 uint recurringCounter; 2592 static void recurringTimer(uint* counter) { 2593 (*counter)++; 2594 } 2595 2596 TimerHandle recurringTimerHandle = theReactor.registerRecurringTimer!recurringTimer( dur!"msecs"(7), &recurringCounter ); 2597 2598 theReactor.sleep(dur!"msecs"(3)); 2599 2600 // Cancel one expired timeout and one yet to happen 2601 handles[0].cancelTimer(); 2602 handles[6].cancelTimer(); 2603 2604 // Wait for all timers to run 2605 theReactor.sleep(dur!"msecs"(200)); 2606 2607 assert(a == 0b1011_1111); 2608 ASSERT!"Recurring timer should run 29 times, ran %s"(recurringCounter>=25 && recurringCounter<=30, recurringCounter); // 203ms / 7 2609 2610 theReactor.stop(); 2611 } 2612 2613 theReactor.spawnFiber(&fiberFunc); 2614 theReactor.start(); 2615 } 2616 2617 unittest { 2618 import mecca.reactor.sync.event; 2619 2620 // Linux has a fiber running for the signal handler, Darwin does not. 2621 version (Darwin) 2622 enum startupFiberCount = 0; 2623 else version (linux) 2624 enum startupFiberCount = 1; 2625 2626 theReactor.setup(); 2627 scope(exit) theReactor.teardown(); 2628 2629 Event evt1, evt2; 2630 2631 class TheException : Exception { 2632 this() { 2633 super("The Exception"); 2634 } 2635 } 2636 2637 void fib2() { 2638 // Release 1 2639 evt1.set(); 2640 2641 try { 2642 // Wait for 1 to do its stuff 2643 evt2.wait(); 2644 2645 assert( false, "Exception not thrown" ); 2646 } catch( Exception ex ) { 2647 assert( ex.msg == "The Exception" ); 2648 } 2649 2650 theReactor.stop(); 2651 } 2652 2653 void fib1() { 2654 assertEQ( theReactor.getFiberState(theReactor.currentFiberHandle), FiberState.Running ); 2655 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 0 ); 2656 auto fib = theReactor.spawnFiber(&fib2); 2657 assertEQ( theReactor.getFiberState(fib), FiberState.Starting ); 2658 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 1 ); 2659 2660 evt1.wait(); 2661 2662 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Starting], 0 ); 2663 2664 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Sleeping], startupFiberCount + 1 ); 2665 theReactor.throwInFiber(fib, new TheException); 2666 // The following should be "1", because the state would switch to Scheduled. Since that's not implemented yet... 2667 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Sleeping], startupFiberCount + 1 ); // Should be 1 2668 assertEQ( theReactor.reactorStats.fibersHistogram[FiberState.Scheduled], 0 ); // Should 1 2669 evt2.set(); 2670 } 2671 2672 theReactor.spawnFiber(&fib1); 2673 theReactor.start(); 2674 } 2675 2676 unittest { 2677 Reactor.OpenOptions options; 2678 options.hangDetectorTimeout = 20.msecs; 2679 options.utGcDisabled = true; 2680 DEBUG!"sanity: %s"(options.hangDetectorTimeout.toString); 2681 2682 testWithReactor({ 2683 theReactor.sleep(200.msecs); 2684 /+ 2685 // To trigger the hang, uncomment this: 2686 import core.thread; 2687 Thread.sleep(200.msecs); 2688 +/ 2689 }, options); 2690 } 2691 2692 /+ 2693 unittest { 2694 // Trigger a segmentation fault 2695 theReactor.options.hangDetectorTimeout = 20.msecs; 2696 DEBUG!"sanity: %s"(theReactor.options.hangDetectorTimeout); 2697 2698 testWithReactor({ 2699 int* a = cast(int*) 16; 2700 *a = 3; 2701 }); 2702 } 2703 +/ 2704 2705 unittest { 2706 // No automatic failing, but at least exercise the *_AS loggers 2707 import mecca.reactor.sync.event; 2708 Event finish; 2709 2710 META!"#UT exercise log_AS functions"(); 2711 2712 void fiberFunc() { 2713 DEBUG!"Fiber started"(); 2714 finish.wait(); 2715 DEBUG!"Fiber finished"(); 2716 } 2717 2718 void testBody() { 2719 auto fh = theReactor.spawnFiber(&fiberFunc); 2720 theReactor.yield(); 2721 2722 DEBUG!"Main fiber logging as %s"(fh); 2723 theReactor.DEBUG_AS!"DEBUG trace with argument %s"(fh, 17); 2724 theReactor.INFO_AS!"INFO trace"(fh); 2725 theReactor.WARN_AS!"WARN trace"(fh); 2726 theReactor.ERROR_AS!"ERROR trace"(fh); 2727 theReactor.META_AS!"META trace"(fh); 2728 2729 DEBUG!"Killing fiber"(); 2730 finish.set(); 2731 theReactor.yield(); 2732 2733 DEBUG!"Trying to log as dead fiber"(); 2734 theReactor.DEBUG_AS!"DEBUG trace on dead fiber with argument %s"(fh, 18); 2735 } 2736 2737 testWithReactor(&testBody); 2738 } 2739 2740 unittest { 2741 // Make sure we do not immediately repeat the same FiberId on relaunch 2742 FiberId fib; 2743 2744 void test1() { 2745 fib = theReactor.currentFiberId(); 2746 } 2747 2748 void test2() { 2749 assert(theReactor.currentFiberId() != fib); 2750 } 2751 2752 testWithReactor({ 2753 theReactor.spawnFiber(&test1); 2754 theReactor.yield(); 2755 theReactor.spawnFiber(&test2); 2756 theReactor.yield(); 2757 }); 2758 } 2759 2760 unittest { 2761 // Make sure that FiberHandle can return a ReactorFiber* 2762 static void fiberBody() { 2763 assert( theReactor.currentFiberPtr == theReactor.currentFiberHandle.get() ); 2764 } 2765 2766 testWithReactor({ 2767 theReactor.spawnFiber!fiberBody(); // Run twice to make sure the genration isn't 0 2768 theReactor.yield(); 2769 theReactor.spawnFiber!fiberBody(); 2770 theReactor.yield(); 2771 }); 2772 } 2773 2774 unittest { 2775 int ret = testWithReactor( { return 17; } ); 2776 2777 assert( ret==17 ); 2778 } 2779 2780 unittest { 2781 // test priority of scheduled fibers 2782 2783 uint gen; 2784 string[] runOrder; 2785 2786 void verify(bool Priority)(string id) { 2787 2788 static if( Priority ) { 2789 auto priorityWatcher = theReactor.boostFiberPriority(); 2790 } 2791 theReactor.suspendCurrentFiber(); 2792 runOrder ~= id; 2793 } 2794 2795 testWithReactor({ 2796 FiberHandle[] fh; 2797 fh ~= theReactor.spawnFiber(&verify!false, "reg1"); 2798 fh ~= theReactor.spawnFiber(&verify!false, "reg2"); 2799 fh ~= theReactor.spawnFiber(&verify!false, "imm1"); // Scheduled immediate 2800 fh ~= theReactor.spawnFiber(&verify!true, "pri1"); 2801 fh ~= theReactor.spawnFiber(&verify!false, "imm2"); // Scheduled immediate 2802 fh ~= theReactor.spawnFiber(&verify!true, "pri2"); 2803 fh ~= theReactor.spawnFiber(&verify!false, "reg3"); 2804 fh ~= theReactor.spawnFiber(&verify!true, "pri3"); // reg1 bypasses this one in order to avoid starvation 2805 fh ~= theReactor.spawnFiber(&verify!false, "imm3"); // Scheduled immediate 2806 theReactor.yield(); 2807 foreach(i; 0..fh.length) 2808 theReactor.resumeFiber( fh[i], i==2 || i==4 ); 2809 2810 // Resuming immediate an already scheduled fiber should change its priority 2811 theReactor.resumeFiber( fh[8], true ); 2812 2813 theReactor.yield(); 2814 assertEQ(["imm1", "imm2", "imm3", "pri1", "pri2", "reg1", "pri3", "reg2", "reg3"], runOrder); 2815 }); 2816 } 2817 2818 unittest { 2819 // Test join 2820 2821 static void fiberBody() { 2822 theReactor.yield(); 2823 theReactor.yield(); 2824 theReactor.yield(); 2825 } 2826 2827 testWithReactor({ 2828 FiberHandle fh = theReactor.spawnFiber!fiberBody(); 2829 assertEQ( theReactor.getFiberState(fh), FiberState.Starting ); 2830 theReactor.yield(); 2831 assertEQ( theReactor.getFiberState(fh), FiberState.Scheduled ); 2832 theReactor.joinFiber(fh); 2833 assertEQ( theReactor.getFiberState(fh), FiberState.None ); 2834 theReactor.joinFiber(fh, Timeout(Duration.zero)); 2835 }); 2836 } 2837 2838 unittest { 2839 testWithReactor({ 2840 void doNothing() { 2841 while( true ) 2842 theReactor.sleep(1.msecs); 2843 } 2844 2845 FiberHandle[] handles; 2846 handles ~= theReactor.spawnFiber(&doNothing); 2847 handles ~= theReactor.spawnFiber(&doNothing); 2848 theReactor.sleep(5.msecs); 2849 2850 foreach(fh; handles) { 2851 theReactor.throwInFiber!FiberKilled(fh); 2852 } 2853 2854 DEBUG!"Sleeping for 1ms"(); 2855 theReactor.sleep(1.msecs); 2856 DEBUG!"Woke up from sleep"(); 2857 }); 2858 } 2859 2860 unittest { 2861 testWithReactor({ 2862 static void fiber() { 2863 theReactor.sleep(1.seconds); 2864 } 2865 2866 auto fh = theReactor.spawnFiber!fiber(); 2867 2868 DEBUG!"Starting test"(); 2869 foreach(i; 0..4) { 2870 theReactor.LOG_TRACEBACK_AS(fh, "test"); 2871 DEBUG!"Back in fiber"(); 2872 theReactor.yield(); 2873 } 2874 DEBUG!"Test ended"(); 2875 }); 2876 }