1 module mecca.lib.time_queue; 2 3 // Licensed under the Boost license. Full copyright information in the AUTHORS file 4 5 import std.algorithm : min, max; 6 import std.math : abs; 7 8 import mecca.containers.lists; 9 import mecca.lib.division: S64Divisor; 10 import mecca.lib.exception; 11 import mecca.lib.time; 12 import mecca.lib.reflection; 13 import mecca.log; 14 15 // DMDBUG define a global for the sole purpose of forcing functions to not be misidentified as pure. See notPure() 16 private __gshared uint notPureDMDBUG; 17 18 struct CascadingTimeQueue(T, size_t numBins, size_t numLevels, bool hasOwner = false) { 19 private: 20 debug(timeq_verbosity) { 21 enum INTERNAL_VERBOSITY = true; 22 } else { 23 enum INTERNAL_VERBOSITY = false; 24 } 25 26 static assert (numBins>1, "Seriously? What were you thining?"); 27 static assert ((numBins & (numBins - 1)) == 0, "numBins must be a power of 2"); 28 static assert (numLevels > 1); 29 static assert (numBins * numLevels < 256*8); 30 // Minimal maximal span is the size of all bins in the last level + one bin of the first level. Not exported because 31 // meaningless, as we have the overflow to catch uses outside this span. 32 enum spanInBins = (numBins-1) * rawBinsInBin(numLevels-1) + 1; 33 34 alias Phase = ulong; 35 36 TscTimePoint[numLevels] baseTimes; // Time the first bin of each level begins on 37 TscTimePoint[numLevels] endTimes; // The time point that marks bins no longer available in this level 38 TscTimePoint poppedTime; // Marks the END of the bin currently pointed to by phase (last timestamp in bin) 39 long resolutionCycles; 40 ulong nextEntryHint = ulong.max; // Next active bin not before this number of bins (max = invalid) 41 S64Divisor[numLevels] resolutionDividers; 42 Phase phase; 43 size_t _length; // Number of elements in the queue 44 version(unittest) { 45 ulong[numLevels+1] stats; 46 } 47 48 static if( hasOwner ) 49 alias ListType = LinkedListWithOwner!T; 50 else 51 alias ListType = LinkedList!T; 52 53 ListType[numBins][numLevels] bins; 54 ListType[2] overflow; 55 uint activeOverflow; 56 57 public: 58 static if( hasOwner ) { 59 alias OwnerAttrType = ListType*; 60 } 61 62 void open(Duration resolution, TscTimePoint startTime = TscTimePoint.hardNow) @safe @nogc { 63 open(TscTimePoint.toCycles(resolution), startTime); 64 } 65 66 void open(long resolutionCycles, TscTimePoint startTime) @safe @nogc { 67 assert (resolutionCycles > 0); 68 assertEQ (_length, 0); 69 this.resolutionCycles = resolutionCycles; 70 foreach( uint level; 0..numLevels ) { 71 this.resolutionDividers[level] = S64Divisor(resolutionCycles*rawBinsInBin(level)); 72 } 73 this.phase = 0; 74 this.nextEntryHint = ulong.max; 75 version (unittest) { 76 this.stats[] = 0; 77 } 78 79 this.poppedTime = startTime; 80 this.baseTimes[0] = this.poppedTime; 81 this.baseTimes[0] -= resolutionCycles - 1; // First bin in first level is for already expired jobs 82 foreach( uint level; 0..numLevels ) { 83 this.endTimes[level] = this.baseTimes[level]; 84 this.endTimes[level] += binsInLevel(level) * resolutionCycles; 85 86 if( level==numLevels-1 ) 87 break; 88 89 // On init, next level begins where current level ends 90 this.baseTimes[level+1] = this.endTimes[level]; 91 } 92 } 93 94 void close() nothrow @safe @nogc { 95 foreach(ref lvl; bins) { 96 foreach(ref bin; lvl) { 97 while( !bin.empty ) { 98 _length--; 99 bin.popHead; 100 } 101 } 102 } 103 104 ASSERT!"Standby overflow bin not empty"( overflow[1-activeOverflow].empty ); 105 while( !overflow[activeOverflow].empty ) { 106 _length--; 107 overflow[activeOverflow].popHead; 108 } 109 110 ASSERT!"At end of CascadingTimeQueue.close() _length is %s (expected 0)"(_length==0, _length); 111 } 112 113 @property size_t length() pure const nothrow @safe @nogc { 114 return _length; 115 } 116 117 static if( hasOwner ) { 118 void cancel(T entry) nothrow @safe @nogc { 119 ListType.discard(entry); 120 _length--; 121 } 122 } 123 124 @notrace Duration timeTillNextEntry(TscTimePoint now) nothrow @safe @nogc { 125 ulong cycles = cyclesTillNextEntry(now); 126 127 if( cycles==ulong.max ) 128 return Duration.max; 129 130 return TscTimePoint.toDuration(cycles); 131 } 132 133 ulong cyclesTillNextEntry(TscTimePoint now) nothrow @safe @nogc { 134 DBG_ASSERT!"time moved backwards %s=>%s"(now>=poppedTime, now, poppedTime); 135 ulong binsToGo = binsTillNextEntry(); 136 static if(INTERNAL_VERBOSITY) DEBUG!"XXX Bins to go %s phase %s"(binsToGo, phase); 137 138 if( binsToGo == ulong.max ) 139 return ulong.max; 140 141 long delta = now.cycles - poppedTime.cycles; 142 long wait = binsToGo * resolutionCycles - delta; 143 if( wait<0 ) 144 return 0; 145 return wait; 146 } 147 148 @notrace T pop(TscTimePoint now) nothrow @safe @nogc { 149 assertOp!"<="(poppedTime, now, "current time moved backwards"); 150 151 // If there are expired events, return those first 152 auto event = bins[0][phase % numBins].popHead(); 153 if( event !is T.init ) { 154 ASSERT!"Successfully popped element from cascaded time queue, but length is 0"(_length>0); 155 _length--; 156 return event; 157 } 158 159 ulong cyclesInPast = max(now.cycles - poppedTime.cycles, 0); 160 ulong binsInPast = cyclesInPast / resolutionDividers[0]; 161 162 while (binsInPast>0) { 163 calcNextEntryHint(); 164 165 DBG_ASSERT!"Calculated next entry hint is 0 when it shouldn't be. Phase %s bin empty state %s"( 166 nextEntryHint>0, phase, bins[0][phase % numBins].empty ); 167 168 ulong advanceCount = min( binsInPast, nextEntryHint ); 169 advancePhase( advanceCount ); 170 binsInPast -= advanceCount; 171 172 event = bins[0][phase % numBins].popHead(); 173 if( event !is T.init ) { 174 ASSERT!"Successfully popped element from cascaded time queue, but length is 0"(_length>0); 175 _length--; 176 return event; 177 } 178 } 179 180 DBG_ASSERT!"Time isn't \"now\" at end of unsuccessful pop. unpopped %s now %s"( 181 abs(poppedTime.cycles - now.cycles) <= resolutionCycles, poppedTime, now ); 182 183 return T.init; 184 } 185 186 void insert(T entry) nothrow @safe @nogc { 187 // DMDBUG compile by package sometimes thinks this function is pure, which causes linker errors :-( 188 notPure(); 189 190 void updateHint(ulong binsInFuture) { 191 if( nextEntryHint <= binsInFuture ) 192 return; 193 194 static if(INTERNAL_VERBOSITY) 195 DEBUG!"XXX entry hint moved %s=>%s"(nextEntryHint, binsInFuture); 196 197 nextEntryHint = binsInFuture; 198 } 199 200 static if(INTERNAL_VERBOSITY) 201 DEBUG!"XXX insert entry %s popped time %s base time %s phase %s"( 202 entry.timePoint, poppedTime, baseTimes[0], phase ); 203 204 if (entry.timePoint <= poppedTime) { 205 // Already expired entries all go to the same bin 206 static if(INTERNAL_VERBOSITY) 207 DEBUG!"XXX insert at first bin, level 0 bin %s"(phase%numBins); 208 bins[0][phase % numBins].append(entry); 209 updateHint(0); 210 } else if(entry.timePoint>=endTimes[numLevels-1]) { 211 static if(INTERNAL_VERBOSITY) 212 DEBUG!"XXX No room for entry %s in cascaded queue (ending at %s). Inserted in overflow"( 213 entry.timePoint, endTimes[numLevels-1] ); 214 overflow[activeOverflow].append(entry); 215 216 ulong binsInFuture = (endTimes[numLevels-1].cycles - poppedTime.cycles) / resolutionDividers[0]; 217 updateHint(binsInFuture); 218 219 version (unittest) { 220 stats[numLevels]++; 221 } 222 } else { 223 ulong binsInFuture; 224 225 // Find out which level we need to insert at 226 uint level; 227 while( entry.timePoint >= endTimes[level] ) { 228 binsInFuture += (numBins - phaseInLevel(level)) * rawBinsInBin(level); 229 level++; 230 DBG_ASSERT!"Entry %s is pass end of queue %s"(level<numLevels, entry.timePoint, endTimes[numLevels-1]); 231 } 232 233 version (unittest) { 234 stats[level]++; 235 } 236 237 ulong cyclesInLevel = entry.timePoint.cycles - baseTimes[level].cycles; 238 ulong idxInLevel = cyclesInLevel / resolutionDividers[level]; 239 DBG_ASSERT!"Phase %s in level %s bigger than idxInLevel %s. Base time %s cycles per bin %s(%s)"( 240 idxInLevel >= phaseInLevel(level), phaseInLevel(level), level, idxInLevel, baseTimes[level], 241 resolutionCycles*rawBinsInBin(level), resolutionCycles ); 242 ulong binsInFutureDelta = (idxInLevel - phaseInLevel(level)) * rawBinsInBin(level); 243 DBG_ASSERT! 244 "Insert %s bins in future %s(+%s) bigger than level %s size %s. Base time %s cycles per bin %s(%s)" 245 (binsInFutureDelta<binsInLevel(level), entry.timePoint, binsInFuture, binsInFutureDelta, level, 246 binsInLevel(level), baseTimes[level], resolutionCycles*rawBinsInBin(level), resolutionCycles); 247 binsInFuture += binsInFutureDelta; 248 idxInLevel %= numBins; 249 250 updateHint(binsInFuture); 251 252 bins[level][idxInLevel].append(entry); 253 static if(INTERNAL_VERBOSITY) 254 DEBUG!"XXX insert at level %s bin %s binsInFuture %s"(level, idxInLevel, binsInFuture); 255 } 256 257 _length++; 258 } 259 260 int opApply(scope int delegate(T t, uint level, uint bin) @nogc dg) @nogc { 261 foreach(uint level; 0..numLevels) { 262 foreach(uint bin; 0..numBins) { 263 foreach(ref T event; bins[level][bin].range()) { 264 if( dg(event, level, bin) ) 265 return 1; 266 } 267 } 268 } 269 270 return 0; 271 } 272 273 private: 274 @notrace ulong binsTillNextEntry() nothrow @safe @nogc { 275 calcNextEntryHint(); 276 277 return nextEntryHint; 278 } 279 280 static ulong rawBinsInBin(uint level) pure nothrow @safe @nogc { 281 DBG_ASSERT!"Level passed is too big %s<%s"(level<=numLevels, level, numLevels); 282 return numBins ^^ level; 283 } 284 285 static ulong binsInLevel(uint level) pure nothrow @safe @nogc { 286 return rawBinsInBin(level+1); 287 } 288 289 uint phaseInLevel(uint level) pure const nothrow @safe @nogc { 290 ulong levelPhase = phase / rawBinsInBin(level); 291 292 return levelPhase % numBins; 293 } 294 295 void calcNextEntryHint() nothrow @safe @nogc { 296 if( nextEntryHint!=0 ) 297 return; 298 299 ref ListType nextLevelList(uint level) pure { 300 if( level<numLevels-1 ) { 301 return bins[level+1][phaseInLevel(level+1)]; 302 } else { 303 return overflow[activeOverflow]; 304 } 305 } 306 307 foreach( uint level; 0..numLevels ) { 308 foreach( idx; phaseInLevel(level) .. numBins ) { 309 if( !bins[level][idx].empty ) 310 return; 311 312 nextEntryHint += rawBinsInBin(level); 313 } 314 315 if( !nextLevelList(level).empty ) { 316 // The first bin of the next level is not empty. We have to unwrap it before we can figure out what's 317 // the next event to be handled, so set the hint to point to it. 318 continue; 319 } 320 321 // First bin of next level is empty, but we might have entries in this level after the fold 322 ulong speculativeHintDelta; 323 foreach( idx; 0 .. phaseInLevel(level) ) { 324 if( !bins[level][idx].empty ) { 325 nextEntryHint += speculativeHintDelta; 326 static if(INTERNAL_VERBOSITY) 327 DEBUG!"XXX calculated new hint %s"(nextEntryHint); 328 329 return; 330 } 331 332 speculativeHintDelta += rawBinsInBin(level); 333 } 334 } 335 336 if( overflow[activeOverflow].empty ) { 337 // If we've reached here, then the entire CTQ is empty 338 nextEntryHint = ulong.max; 339 } 340 341 static if(INTERNAL_VERBOSITY) 342 DEBUG!"XXX calculated new hint %s"(nextEntryHint); 343 } 344 345 @notrace void advancePhase( ulong advanceCount ) nothrow @safe @nogc { 346 if( advanceCount==0 ) 347 return; 348 349 assertOp!"<="( advanceCount, nextEntryHint, "Tried to advance the phase past the next entry" ); 350 uint[numLevels] oldPhases = -1; 351 oldPhases[0] = phaseInLevel(0); 352 353 bool needCascading = oldPhases[0] + advanceCount >= numBins-1; 354 if( needCascading ) { 355 foreach( uint level; 1..numLevels ) { 356 oldPhases[level] = phaseInLevel(level); 357 } 358 } 359 360 phase += advanceCount; 361 if( nextEntryHint !is ulong.max ) 362 nextEntryHint -= advanceCount; 363 364 poppedTime += advanceCount * resolutionCycles; 365 366 if( !needCascading ) { 367 // Stayed in same level. Only thing to do is update the level's end 368 endTimes[0] += advanceCount * resolutionCycles; 369 return; 370 } 371 372 uint maxAffectedLevel; 373 ulong levelBinsToAdvance = advanceCount; 374 foreach( uint level; 0..numLevels ) { 375 maxAffectedLevel = level; 376 377 endTimes[level] += levelBinsToAdvance * rawBinsInBin(level) * resolutionCycles; 378 379 levelBinsToAdvance += oldPhases[level]; 380 levelBinsToAdvance /= numBins; 381 if( levelBinsToAdvance==0 ) 382 break; 383 384 ulong cyclesAdvanced = levelBinsToAdvance * binsInLevel(level) * resolutionCycles; 385 baseTimes[level] += cyclesAdvanced; 386 DBG_ASSERT!"Level %s end level time %s does not match start level time %s (should be %s)"( 387 (endTimes[level].cycles - baseTimes[level].cycles) / (binsInLevel(level)*resolutionCycles) == 1, 388 level, endTimes[level], baseTimes[level], baseTimes[level] + binsInLevel(level)*resolutionCycles); 389 } 390 391 if( (phase % (numBins ^^ numLevels)) == 0 && !overflow[activeOverflow].empty ) { 392 maxAffectedLevel = numLevels; 393 } 394 395 if( maxAffectedLevel>0 ) 396 cascadeLevel( maxAffectedLevel ); 397 } 398 399 @notrace void cascadeLevel( uint maxLevel ) nothrow @safe @nogc { 400 bool firstCascaded = true; 401 402 uint effectiveMaxLevel = min(maxLevel, numLevels-1); 403 foreach( uint level; 1..effectiveMaxLevel+1 ) { 404 // The bin to clear is the one right *before* the current one 405 uint binToClearIdx = previousBinIdx(level); 406 407 auto binToClear = &bins[level][ binToClearIdx ]; 408 if( !binToClear.empty && firstCascaded ) { 409 firstCascaded = false; 410 } 411 412 while( !binToClear.empty ) { 413 auto event = binToClear.popHead(); 414 ASSERT!"During cascading, length is 0 when bins have entries"(_length>0); 415 _length--; 416 insert(event); 417 } 418 } 419 420 if( maxLevel==numLevels ) { 421 // Cascade the overflow 422 auto outOverflow = &overflow[activeOverflow]; 423 activeOverflow = (activeOverflow + 1) % 2; // Switch to other buffer 424 auto inOverflow = &overflow[activeOverflow]; 425 426 DBG_ASSERT!"Passive overflow not empty before switch"( inOverflow.empty ); 427 428 TscTimePoint ctqEnd = endTimes[numLevels-1]; 429 while( !outOverflow.empty ) { 430 T entry = outOverflow.popHead(); 431 if( entry.timePoint >= ctqEnd ) { 432 inOverflow.append(entry); 433 } else { 434 /* 435 We could do only this, but as most entries in the overflow will usually remain in the overflow, 436 this if should save a few cycles. 437 */ 438 DBG_ASSERT!"Length is 0 while moving entries from overflow"(_length>0); 439 _length--; 440 insert(entry); 441 } 442 } 443 } 444 445 DBG_ASSERT!"nextEntryHint incorrect after cascading"( firstCascaded || nextEntryHint!=ulong.max ); 446 } 447 448 uint previousBinIdx( uint level ) pure const nothrow @safe @nogc { 449 return (phaseInLevel(level) + numBins - 1) % numBins; 450 } 451 452 @notrace void notPure() const nothrow @trusted @nogc { 453 if( numBins==0 ) // Which it never is 454 notPureDMDBUG++; // Cannot be pure. 455 } 456 } 457 458 version(unittest): 459 private: 460 bool popOne(T, size_t numBins, size_t numLevels, bool hasOwner) 461 (ref CascadingTimeQueue!(T, numBins, numLevels, hasOwner) ctq, TscTimePoint now, void delegate(T) validate = null) 462 { 463 T entry; 464 bool foundSomething; 465 466 while( (entry = ctq.pop(now)) !is null ) { 467 if( validate !is null ) 468 validate(entry); 469 470 // Make sure the time is correct 471 assert( entry.timePoint>TscTimePoint(now.cycles - ctq.resolutionCycles) && 472 entry.timePoint<TscTimePoint(now.cycles+ctq.resolutionCycles) ); 473 474 foundSomething = true; 475 } 476 477 return foundSomething; 478 } 479 480 void wait(CTQ)(ref CTQ ctq, ref TscTimePoint now) { 481 auto oldNow = now; 482 auto delta = ctq.cyclesTillNextEntry(now); 483 now = TscTimePoint( now.cycles + delta ); 484 DEBUG!"Advanced from %s to %s (delta %s)"(oldNow, now, delta); 485 assert(delta>0); 486 } 487 488 unittest { 489 META!"UT test time queue's levels test 1"(); 490 static struct Entry { 491 TscTimePoint timePoint; 492 Entry* _next; 493 Entry* _prev; 494 } 495 496 enum resolution = 10; 497 enum numBins = 4; 498 enum numLevels = 3; 499 CascadingTimeQueue!(Entry*, numBins, numLevels) ctq; 500 ctq.open(resolution, TscTimePoint(0)); 501 502 Entry[] entries; 503 entries ~= Entry(TscTimePoint(30)); 504 entries ~= Entry(TscTimePoint(0)); 505 entries ~= Entry(TscTimePoint(41)); 506 entries ~= Entry(TscTimePoint(70)); 507 entries ~= Entry(TscTimePoint(71)); 508 entries ~= Entry(TscTimePoint(110)); 509 entries ~= Entry(TscTimePoint(111)); 510 entries ~= Entry(TscTimePoint(150)); 511 entries ~= Entry(TscTimePoint(151)); 512 entries ~= Entry(TscTimePoint(190)); 513 entries ~= Entry(TscTimePoint(191)); 514 entries ~= Entry(TscTimePoint(350)); 515 entries ~= Entry(TscTimePoint(351)); 516 entries ~= Entry(TscTimePoint(510)); 517 entries ~= Entry(TscTimePoint(511)); 518 entries ~= Entry(TscTimePoint(643)); 519 entries ~= Entry(TscTimePoint(670)); 520 entries ~= Entry(TscTimePoint(671)); 521 entries ~= Entry(TscTimePoint(830)); 522 523 foreach( ref entry; entries ) { 524 ctq.insert( &entry ); 525 } 526 527 foreach( Entry* entry, uint level, uint bin; ctq ) { 528 DEBUG!"Entry %s level %s bin %s"(*entry, level, bin); 529 } 530 531 assertEQ(entries.length, ctq.length); 532 533 auto now = TscTimePoint(0); 534 while( true ) { 535 long wait = ctq.cyclesTillNextEntry(now); 536 DEBUG!"now %s cycles to wait %s"( now, wait ); 537 if( wait == ulong.max ) 538 break; 539 540 Entry* entry = ctq.pop(now); 541 if( wait!=0 ) 542 assert( entry is null ); 543 else { 544 if( entry !is null ) 545 DEBUG!"Extracted entry %s from queue"( entry.timePoint ); 546 else 547 INFO!"Non-zero wait but pop returned nothing"(); 548 } 549 550 now += wait; 551 } 552 553 assertEQ(0, ctq.length); 554 } 555 556 unittest { 557 META!"UT test time queue's levels test 2"(); 558 import std.stdio; 559 import std..string; 560 import std.algorithm: count, map; 561 import std.array; 562 563 static struct Entry { 564 TscTimePoint timePoint; 565 string name; 566 Entry* _next; 567 Entry* _prev; 568 } 569 570 enum resolution = 50; 571 enum numBins = 16; 572 enum numLevels = 3; 573 CascadingTimeQueue!(Entry*, numBins, numLevels) ctq; 574 ctq.open(resolution, TscTimePoint(0)); 575 scope(success) ctq.close(); 576 static assert (ctq.spanInBins == (numBins-1) * 16^^2 + 1); 577 578 bool[Entry*] entries; 579 Entry* insert(TscTimePoint tp, string name) { 580 Entry* e = new Entry(tp, name); 581 ctq.insert(e); 582 entries[e] = true; 583 return e; 584 } 585 586 insert(90.TscTimePoint, "e1"); 587 insert(120.TscTimePoint, "e2"); 588 insert(130.TscTimePoint, "e3"); 589 insert(160.TscTimePoint, "e4"); 590 insert(TscTimePoint(resolution*numBins-1), "e5"); 591 insert(TscTimePoint(resolution*numBins + 10), "e6"); 592 593 long then = 0; 594 foreach(long now; [10, 50, 80, 95, 100, 120, 170, 190, 210, 290, resolution*numBins, resolution*(numBins+1), resolution*(numBins+1)+1]) { 595 Entry* e; 596 while ((e = ctq.pop(TscTimePoint(now))) !is null) { 597 scope(failure) writefln("%s:%s (%s..%s, %s)", e.name, e.timePoint, then, now, ctq.baseTimes[0]); 598 assert (e.timePoint.cycles/resolution <= now/resolution, "tp=%s then=%s now=%s".format(e.timePoint, then, now)); 599 assert (e.timePoint.cycles/resolution >= then/resolution - 1, "tp=%s then=%s now=%s".format(e.timePoint, then, now)); 600 assert (e in entries); 601 entries.remove(e); 602 } 603 then = now; 604 } 605 assert (entries.length == 0, "Entries not empty: %s".format(entries)); 606 607 auto e7 = insert(ctq.baseTimes[0] + resolution * (ctq.spanInBins), "e7"); 608 609 insert(ctq.baseTimes[0] + resolution * (ctq.spanInBins + ctq.binsInLevel(2)), "e8"); 610 assert (ctq.stats[numLevels] == 1); 611 612 auto e = ctq.pop(e7.timePoint + resolution); 613 assert (e is e7, "%s".format(e)); 614 } 615 616 unittest { 617 META!"UT test time queue with captured values"(); 618 import std.stdio; 619 import std..string; 620 import mecca.containers.pools; 621 import std.algorithm: min; 622 import std.random; 623 624 static struct Entry { 625 TscTimePoint timePoint; 626 ulong counter; 627 Entry* _next; 628 Entry* _prev; 629 } 630 631 // must set these for the UT to be reproducible 632 const t0 = TscTimePoint(168513482286); 633 const cyclesPerSecond = 2208014020; 634 const cyclesPerUsec = cyclesPerSecond / 1_000_000; 635 long toCycles(Duration dur) { 636 enum HECTONANO = 10_000_000; 637 long hns = dur.total!"hnsecs"; 638 return (hns / HECTONANO) * cyclesPerSecond + ((hns % HECTONANO) * cyclesPerSecond) / HECTONANO; 639 } 640 641 void testCTQ(size_t numBins, size_t numLevels, size_t numElems)(Duration resolutionDur) { 642 FixedPool!(Entry, numElems) pool; 643 CascadingTimeQueue!(Entry*, numBins, numLevels) ctq; 644 645 TscTimePoint now = t0; 646 long totalInserted = 0; 647 long totalPopped = 0; 648 long iterationCounter = 0; 649 auto span = resolutionDur * ctq.spanInBins; 650 auto end = t0 + toCycles(span * 2); 651 long before = toCycles(10.msecs); 652 long ahead = toCycles(span/2); 653 654 pool.open(); 655 ctq.open(toCycles(resolutionDur), t0); 656 657 //uint seed = 3594633224; //1337; 658 uint seed = unpredictableSeed(); 659 auto rand = Random(seed); 660 scope(failure) writefln("seed=%s numBins=%s numLevels=%s resDur=%s iterationCounter=%s totalInserted=%s " ~ 661 "totalPopped=%s t0=%s now=%s", seed, numBins, numLevels, resolutionDur, iterationCounter, totalInserted, 662 totalPopped, t0, now); 663 664 void popReady(long advanceCycles) { 665 auto prevNow = now; 666 now += advanceCycles; 667 uint numPopped = 0; 668 Entry* e; 669 while ((e = ctq.pop(now)) !is null) { 670 assert (e.timePoint <= now, "tp=%s prevNow=%s now=%s".format(e.timePoint, prevNow, now)); 671 //assert (e.timePoint/ctq.baseFrequencyCyclesDenom >= prevNow/ctq.baseFrequencyCyclesDenom, "tp=%s prevNow=%s now=%s".format(e.timePoint, prevNow, now)); 672 numPopped++; 673 pool.release(e); 674 } 675 //writefln("%8d..%8d: %s", (prevNow - t0) / cyclesPerUsec, (now - t0) / cyclesPerUsec, numPopped); 676 totalPopped += numPopped; 677 } 678 679 while (now < end) { 680 while (pool.numAvailable > 0) { 681 auto e = pool.alloc(); 682 e.timePoint = TscTimePoint(uniform(now.cycles - before, min(end.cycles, now.cycles + ahead), rand)); 683 e.counter = totalInserted++; 684 //writefln("insert[%s] at %s", e.counter, (e.timePoint - t0) / cyclesPerUsec); 685 ctq.insert(e); 686 } 687 auto us = uniform(0, 130, rand); 688 if (us > 120) { 689 us = uniform(100, 1500, rand); 690 } 691 popReady(us * cyclesPerUsec); 692 iterationCounter++; 693 } 694 popReady(ahead + ctq.resolutionCycles); 695 auto covered = ctq.baseTimes[0].diff!"cycles"(t0) / double(cyclesPerSecond); 696 auto expectedCover = span.total!"msecs" * (2.5 / 1000); 697 assert (covered >= expectedCover - 2, "%s %s".format(covered, expectedCover)); 698 699 writeln(totalInserted, " ", totalPopped, " ", ctq.stats); 700 foreach(i, s; ctq.stats[0..numLevels]) { 701 assert (s > 0, "level %s never received events".format(i)); 702 } 703 704 assert (totalInserted - totalPopped == pool.numInUse, "(1) pool.used=%s inserted=%s popped=%s".format(pool.numInUse, totalInserted, totalPopped)); 705 assert (totalInserted == totalPopped, "(2) pool.used=%s inserted=%s popped=%s".format(pool.numInUse, totalInserted, totalPopped)); 706 assert (totalInserted > numElems * 2, "totalInserted=%s".format(totalInserted)); 707 } 708 709 int numRuns = 0; 710 foreach(numElems; [10_000 /+, 300, 1000, 4000, 5000+/]) { 711 // spans 878s 712 testCTQ!(256, 3, 10_000)(50.usecs); 713 numRuns++; 714 } 715 assert (numRuns > 0); 716 } 717 718 unittest { 719 META!"UT testing time queue with actual times"(); 720 import mecca.log; 721 import std..string; 722 723 static struct Entry { 724 TscTimePoint timePoint; 725 string name; 726 Entry* _next; 727 Entry* _prev; 728 } 729 730 // The actual resolution is going to be slightly different than this, but it should only be more accurate, never less. 731 enum resolution = dur!"msecs"(1); 732 enum numBins = 16; 733 enum numLevels = 3; 734 735 TscTimePoint now = TscTimePoint.hardNow; 736 737 CascadingTimeQueue!(Entry*, numBins, numLevels) ctq; 738 ctq.open(resolution, now); 739 740 Entry[6] entries; 741 enum L0Duration = resolution * numBins; 742 enum L1Duration = L0Duration * numBins; 743 744 entries[0] = Entry(now + resolution /2, "e0 l0 b1"); 745 entries[1] = Entry(now + resolution + 2, "e1 l0 b2"); 746 entries[2] = Entry(now + resolution * 10 + 3, "e2 l0 b11"); 747 entries[3] = Entry(now + L0Duration + resolution * 3 + 2, "e3 l1 b0 l0 b4"); 748 entries[4] = Entry(now + L0Duration*7 + resolution * 5 + 7, "e4 l1 b6 l0 b6"); 749 entries[5] = Entry(now + L1Duration + 14, "e5 l2 b0 l0 b1"); 750 751 foreach( ref e; entries ) { 752 INFO!"Entry %s at %s"( e.name, (e.timePoint - now).toString ); 753 } 754 755 // Insert out of order 756 ctq.insert(&entries[3]); 757 ctq.insert(&entries[1]); 758 ctq.insert(&entries[4]); 759 ctq.insert(&entries[5]); 760 ctq.insert(&entries[2]); 761 ctq.insert(&entries[0]); 762 763 uint nextIdx = 0; 764 765 assert(ctq.pop(now) is null, "First entry received too soon"); 766 767 auto base = now; 768 while(nextIdx<entries.length) { 769 auto step = ctq.cyclesTillNextEntry(now); 770 now += step; 771 DEBUG!"Setting time forward by %s to %s"(TscTimePoint.toDuration(step).toString, (now - base).toString); 772 Entry* e = ctq.pop(now); 773 774 if( e !is null ) { 775 INFO!"Got entry %s from queue"(e.name); 776 assert( e.name == entries[nextIdx].name, "Pop returned incorrect entry, expected %s, got %s".format(entries[nextIdx].name, 777 e.name) ); 778 assert( e.timePoint>(now - resolution) && e.timePoint<=now, 779 "Pop returned entry %s at an incorrect time. Current %s expected %s".format(e.name, now-base, e.timePoint-base) ); 780 nextIdx++; 781 } else { 782 DEBUG!"Got empty entry from queue"(); 783 } 784 } 785 } 786 787 /+ 788 unittest { 789 META!"UT testing run with a reactor"(); 790 import mecca.reactor; 791 Reactor.OpenOptions options; 792 options.timerGranularity = 100.usecs; 793 794 int numRuns; 795 796 FiberHandle handle; 797 798 void looper() { 799 DEBUG!"Num runs %s"(numRuns); 800 if( ++numRuns > 100 ) 801 theReactor.resumeFiber(handle); 802 } 803 804 void testBody() { 805 handle = theReactor.currentFiberHandle(); 806 807 theReactor.registerRecurringTimer(dur!"seconds"(1), &looper); 808 809 theReactor.suspendCurrentFiber(); 810 } 811 812 testWithReactor(&testBody, options); 813 814 DEBUG!"Total runs %s"(numRuns); 815 } 816 +/ 817 818 unittest { 819 META!"UT testing random time queue operations"(); 820 // Test the cascading 821 import std.random; 822 823 Mt19937 random; 824 auto seed = unpredictableSeed; 825 random.seed(seed); 826 scope(failure) INFO!"Running with seed %s"(seed); 827 828 enum long resolution = 4; 829 enum numBins = 4; 830 enum numLevels = 4; 831 832 struct Entry { 833 TscTimePoint timePoint; 834 uint id; 835 Entry* _next; 836 Entry* _prev; 837 } 838 839 TscTimePoint[uint] entries; 840 uint nextId; 841 842 CascadingTimeQueue!(Entry*, numBins, numLevels) ctq; 843 auto now = TscTimePoint(0); 844 ctq.open(resolution, now); 845 size_t numEntriesInQueue; 846 847 void insert() { 848 Entry* entry; 849 entry = new Entry; 850 851 uint level = uniform(0, numLevels-1, random); 852 ulong range = 0; 853 foreach( l; 0..level+1 ) { 854 range = resolution * ctq.binsInLevel(level); 855 } 856 857 entry.timePoint = TscTimePoint( now.cycles + uniform(0, range, random) ); 858 entry.id = nextId++; 859 860 entries[entry.id] = entry.timePoint; 861 862 DEBUG!"Pushing entry %s timepoint %s"(entry.id, entry.timePoint); 863 ctq.insert(entry); 864 numEntriesInQueue++; 865 assertEQ( ctq.length, numEntriesInQueue ); 866 } 867 868 void popAll() { 869 Entry* entry; 870 while( (entry = ctq.pop(now)) !is null ) { 871 DEBUG!"At %s popped entry %s timepoint %s"(now, entry.id, entry.timePoint); 872 numEntriesInQueue--; 873 assertEQ( numEntriesInQueue, ctq.length ); 874 assert( entries[entry.id] == entry.timePoint ); 875 entries.remove(entry.id); 876 877 // Make sure the time is correct 878 ASSERT!"entry %s popped at incorrect time. %s<%s<%s"( 879 entry.timePoint>TscTimePoint(now.cycles - resolution) && 880 entry.timePoint<TscTimePoint(now.cycles+resolution), 881 entry.id, 882 TscTimePoint(now.cycles - resolution), 883 entry.timePoint, 884 TscTimePoint(now.cycles + resolution)); 885 } 886 } 887 888 foreach(i; 0..10) { 889 insert(); 890 } 891 892 while( entries.length > 0 ) { 893 popAll(); 894 895 if( nextId<1000 ) { 896 insert(); 897 insert(); 898 } 899 auto oldNow = now; 900 now = TscTimePoint( now.cycles + ctq.cyclesTillNextEntry(now) ); 901 DEBUG!"Advanced from %s to %s phase %s(%s)"(oldNow, now, ctq.phaseInLevel(0), ctq.phase); 902 } 903 904 assertEQ( ctq.length, 0 ); 905 } 906 907 unittest { 908 // Expose a specific problem not visible under random testing 909 META!"UT for specific problems found"(); 910 911 enum long resolution = 10; 912 enum numBins = 4; 913 enum numLevels = 3; 914 915 struct Entry { 916 TscTimePoint timePoint; 917 uint id; 918 Entry* _next; 919 Entry* _prev; 920 } 921 922 TscTimePoint[uint] entries; 923 uint nextId; 924 925 CascadingTimeQueue!(Entry*, numBins, numLevels) ctq; 926 auto now = TscTimePoint(0); 927 ctq.open(resolution, now); 928 929 void insert(TscTimePoint time) { 930 Entry* entry; 931 entry = new Entry; 932 933 entry.timePoint = time; 934 entry.id = nextId++; 935 936 entries[entry.id] = entry.timePoint; 937 938 DEBUG!"Pushing entry %s timepoint %s"(entry.id, entry.timePoint); 939 ctq.insert(entry); 940 } 941 942 void validate(Entry* entry) { 943 DEBUG!"At %s popped entry %s timepoint %s"(now, entry.id, entry.timePoint); 944 assert( entries[entry.id] == entry.timePoint ); 945 entries.remove(entry.id); 946 } 947 948 bool popOne() { 949 return .popOne(ctq, now, &validate); 950 } 951 952 void popAll() { 953 while( !popOne() ) { 954 wait(ctq, now); 955 } 956 } 957 958 // insert a point that goes in the first bin of the second level 959 insert( TscTimePoint(9) ); 960 popOne(); 961 //now += 9; // Give it a time point that is almost, but not quite, to the next bucket 962 Entry* tmpRes = ctq.pop(now); // This should not return anything 963 assert( tmpRes is null ); 964 965 /* Now insert something that is distant enough from our current time to be in the first bucket of the second level, 966 but from the starting point should go into the second bucket of the second level. 967 */ 968 insert( TscTimePoint(91) ); 969 insert( TscTimePoint(99) ); 970 popOne(); 971 popOne(); 972 } 973 974 unittest { 975 META!"UT for overflow support"(); 976 977 enum long resolution = 4; 978 enum numBins = 4; 979 enum numLevels = 2; 980 981 struct Entry { 982 TscTimePoint timePoint; 983 Entry* _next; 984 Entry* _prev; 985 } 986 987 Entry[] entries; 988 989 entries.reserve(10); 990 CascadingTimeQueue!(Entry*, numBins, numLevels) ctq; 991 auto now = TscTimePoint(0); 992 ctq.open(resolution, now); 993 scope(exit) ctq.close(); 994 995 void insert(TscTimePoint point) { 996 entries ~= Entry(point); 997 ctq.insert(&entries[$-1]); 998 } 999 1000 void popOne() { 1001 while( !.popOne(ctq, now) ) { 1002 wait(ctq, now); 1003 } 1004 } 1005 1006 insert(TscTimePoint(74)); // Level 1 bin 3 => L0 b3 1007 insert(TscTimePoint(101)); // Level 2 => L1 b1 1008 1009 popOne(); 1010 auto timeToWait = ctq.binsTillNextEntry(); 1011 assert( timeToWait!=ulong.max, "CTQ failed to account for entry in overflow" ); 1012 1013 // After the pop pointer has moved, insert two points before and after the second point 1014 insert(TscTimePoint(90)); // L1 b0 1015 insert(TscTimePoint(120)); // L1 b2 1016 1017 popOne(); 1018 popOne(); 1019 popOne(); 1020 1021 timeToWait = ctq.binsTillNextEntry(); 1022 assertEQ( timeToWait, ulong.max, "Empty CTQ does not return infinite wait" ); 1023 }