1 module mecca.lib.time_queue;
2 
3 // Licensed under the Boost license. Full copyright information in the AUTHORS file
4 
5 import std.algorithm : min, max;
6 import std.math : abs;
7 
8 import mecca.containers.lists;
9 import mecca.lib.division: S64Divisor;
10 import mecca.lib.exception;
11 import mecca.lib.time;
12 import mecca.lib.reflection;
13 import mecca.log;
14 
15 // DMDBUG define a global for the sole purpose of forcing functions to not be misidentified as pure. See notPure()
16 private __gshared uint notPureDMDBUG;
17 
18 struct CascadingTimeQueue(T, size_t numBins, size_t numLevels, bool hasOwner = false) {
19 private:
20     debug(timeq_verbosity) {
21         enum INTERNAL_VERBOSITY = true;
22     } else {
23         enum INTERNAL_VERBOSITY = false;
24     }
25 
26     static assert (numBins>1, "Seriously? What were you thining?");
27     static assert ((numBins & (numBins - 1)) == 0, "numBins must be a power of 2");
28     static assert (numLevels > 1);
29     static assert (numBins * numLevels < 256*8);
30     // Minimal maximal span is the size of all bins in the last level + one bin of the first level. Not exported because
31     // meaningless, as we have the overflow to catch uses outside this span.
32     enum spanInBins = (numBins-1) * rawBinsInBin(numLevels-1) + 1;
33 
34     alias Phase = ulong;
35 
36     TscTimePoint[numLevels] baseTimes; // Time the first bin of each level begins on
37     TscTimePoint[numLevels] endTimes; // The time point that marks bins no longer available in this level
38     TscTimePoint poppedTime; // Marks the END of the bin currently pointed to by phase (last timestamp in bin)
39     long resolutionCycles;
40     ulong nextEntryHint = ulong.max; // Next active bin not before this number of bins (max = invalid)
41     S64Divisor[numLevels] resolutionDividers;
42     Phase phase;
43     size_t _length; // Number of elements in the queue
44     version(unittest) {
45         ulong[numLevels+1] stats;
46     }
47 
48     static if( hasOwner )
49         alias ListType = LinkedListWithOwner!T;
50     else
51         alias ListType = LinkedList!T;
52 
53     ListType[numBins][numLevels] bins;
54     ListType[2] overflow;
55     uint activeOverflow;
56 
57 public:
58     static if( hasOwner ) {
59         alias OwnerAttrType = ListType*;
60     }
61 
62     void open(Duration resolution, TscTimePoint startTime = TscTimePoint.hardNow) @safe @nogc {
63         open(TscTimePoint.toCycles(resolution), startTime);
64     }
65 
66     void open(long resolutionCycles, TscTimePoint startTime) @safe @nogc {
67         assert (resolutionCycles > 0);
68         assertEQ (_length, 0);
69         this.resolutionCycles = resolutionCycles;
70         foreach( uint level; 0..numLevels ) {
71             this.resolutionDividers[level] = S64Divisor(resolutionCycles*rawBinsInBin(level));
72         }
73         this.phase = 0;
74         this.nextEntryHint = ulong.max;
75         version (unittest) {
76             this.stats[] = 0;
77         }
78 
79         this.poppedTime = startTime;
80         this.baseTimes[0] = this.poppedTime;
81         this.baseTimes[0] -= resolutionCycles - 1; // First bin in first level is for already expired jobs
82         foreach( uint level; 0..numLevels ) {
83             this.endTimes[level] = this.baseTimes[level];
84             this.endTimes[level] += binsInLevel(level) * resolutionCycles;
85 
86             if( level==numLevels-1 )
87                 break;
88 
89             // On init, next level begins where current level ends
90             this.baseTimes[level+1] = this.endTimes[level];
91         }
92     }
93 
94     void close() nothrow @safe @nogc {
95         foreach(ref lvl; bins) {
96             foreach(ref bin; lvl) {
97                 while( !bin.empty ) {
98                     _length--;
99                     bin.popHead;
100                 }
101             }
102         }
103 
104         ASSERT!"Standby overflow bin not empty"( overflow[1-activeOverflow].empty );
105         while( !overflow[activeOverflow].empty ) {
106             _length--;
107             overflow[activeOverflow].popHead;
108         }
109 
110         ASSERT!"At end of CascadingTimeQueue.close() _length is %s (expected 0)"(_length==0, _length);
111     }
112 
113     @property size_t length() pure const nothrow @safe @nogc {
114         return _length;
115     }
116 
117     static if( hasOwner ) {
118         void cancel(T entry) nothrow @safe @nogc {
119             ListType.discard(entry);
120             _length--;
121         }
122     }
123 
124     @notrace Duration timeTillNextEntry(TscTimePoint now) nothrow @safe @nogc {
125         ulong cycles = cyclesTillNextEntry(now);
126 
127         if( cycles==ulong.max )
128             return Duration.max;
129 
130         return TscTimePoint.toDuration(cycles);
131     }
132 
133     ulong cyclesTillNextEntry(TscTimePoint now) nothrow @safe @nogc {
134         DBG_ASSERT!"time moved backwards %s=>%s"(now>=poppedTime, now, poppedTime);
135         ulong binsToGo = binsTillNextEntry();
136         static if(INTERNAL_VERBOSITY) DEBUG!"XXX Bins to go %s phase %s"(binsToGo, phase);
137 
138         if( binsToGo == ulong.max )
139             return ulong.max;
140 
141         long delta = now.cycles - poppedTime.cycles;
142         long wait = binsToGo * resolutionCycles - delta;
143         if( wait<0 )
144             return 0;
145         return wait;
146     }
147 
148     @notrace T pop(TscTimePoint now) nothrow @safe @nogc {
149         assertOp!"<="(poppedTime, now, "current time moved backwards");
150 
151         // If there are expired events, return those first
152         auto event = bins[0][phase % numBins].popHead();
153         if( event !is T.init ) {
154             ASSERT!"Successfully popped element from cascaded time queue, but length is 0"(_length>0);
155             _length--;
156             return event;
157         }
158 
159         ulong cyclesInPast = max(now.cycles - poppedTime.cycles, 0);
160         ulong binsInPast = cyclesInPast / resolutionDividers[0];
161 
162         while (binsInPast>0) {
163             calcNextEntryHint();
164 
165             DBG_ASSERT!"Calculated next entry hint is 0 when it shouldn't be. Phase %s bin empty state %s"(
166                     nextEntryHint>0, phase, bins[0][phase % numBins].empty );
167 
168             ulong advanceCount = min( binsInPast, nextEntryHint );
169             advancePhase( advanceCount );
170             binsInPast -= advanceCount;
171 
172             event = bins[0][phase % numBins].popHead();
173             if( event !is T.init ) {
174                 ASSERT!"Successfully popped element from cascaded time queue, but length is 0"(_length>0);
175                 _length--;
176                 return event;
177             }
178         }
179 
180         DBG_ASSERT!"Time isn't \"now\" at end of unsuccessful pop. unpopped %s now %s"(
181                 abs(poppedTime.cycles - now.cycles) <= resolutionCycles, poppedTime, now );
182 
183         return T.init;
184     }
185 
186     void insert(T entry) nothrow @safe @nogc {
187         // DMDBUG compile by package sometimes thinks this function is pure, which causes linker errors :-(
188         notPure();
189 
190         void updateHint(ulong binsInFuture) {
191             if( nextEntryHint <= binsInFuture )
192                 return;
193 
194             static if(INTERNAL_VERBOSITY)
195                 DEBUG!"XXX entry hint moved %s=>%s"(nextEntryHint, binsInFuture);
196 
197             nextEntryHint = binsInFuture;
198         }
199 
200         static if(INTERNAL_VERBOSITY)
201             DEBUG!"XXX insert entry %s popped time %s base time %s phase %s"(
202                     entry.timePoint, poppedTime, baseTimes[0], phase );
203 
204         if (entry.timePoint <= poppedTime) {
205             // Already expired entries all go to the same bin
206             static if(INTERNAL_VERBOSITY)
207                 DEBUG!"XXX insert at first bin, level 0 bin %s"(phase%numBins);
208             bins[0][phase % numBins].append(entry);
209             updateHint(0);
210         } else if(entry.timePoint>=endTimes[numLevels-1]) {
211             static if(INTERNAL_VERBOSITY)
212                 DEBUG!"XXX No room for entry %s in cascaded queue (ending at %s). Inserted in overflow"(
213                         entry.timePoint, endTimes[numLevels-1] );
214             overflow[activeOverflow].append(entry);
215 
216             ulong binsInFuture = (endTimes[numLevels-1].cycles - poppedTime.cycles) / resolutionDividers[0];
217             updateHint(binsInFuture);
218 
219             version (unittest) {
220                 stats[numLevels]++;
221             }
222         } else {
223             ulong binsInFuture;
224 
225             // Find out which level we need to insert at
226             uint level;
227             while( entry.timePoint >= endTimes[level] ) {
228                 binsInFuture += (numBins - phaseInLevel(level)) * rawBinsInBin(level);
229                 level++;
230                 DBG_ASSERT!"Entry %s is pass end of queue %s"(level<numLevels, entry.timePoint, endTimes[numLevels-1]);
231             }
232 
233             version (unittest) {
234                 stats[level]++;
235             }
236 
237             ulong cyclesInLevel = entry.timePoint.cycles - baseTimes[level].cycles;
238             ulong idxInLevel = cyclesInLevel / resolutionDividers[level];
239             DBG_ASSERT!"Phase %s in level %s bigger than idxInLevel %s. Base time %s cycles per bin %s(%s)"(
240                     idxInLevel >= phaseInLevel(level), phaseInLevel(level), level, idxInLevel, baseTimes[level],
241                     resolutionCycles*rawBinsInBin(level), resolutionCycles );
242             ulong binsInFutureDelta = (idxInLevel - phaseInLevel(level)) * rawBinsInBin(level);
243             DBG_ASSERT!
244                     "Insert %s bins in future %s(+%s) bigger than level %s size %s. Base time %s cycles per bin %s(%s)"
245                 (binsInFutureDelta<binsInLevel(level), entry.timePoint, binsInFuture, binsInFutureDelta, level,
246                     binsInLevel(level), baseTimes[level], resolutionCycles*rawBinsInBin(level), resolutionCycles);
247             binsInFuture += binsInFutureDelta;
248             idxInLevel %= numBins;
249 
250             updateHint(binsInFuture);
251 
252             bins[level][idxInLevel].append(entry);
253             static if(INTERNAL_VERBOSITY)
254                 DEBUG!"XXX insert at level %s bin %s binsInFuture %s"(level, idxInLevel, binsInFuture);
255         }
256 
257         _length++;
258     }
259 
260     int opApply(scope int delegate(T t, uint level, uint bin) @nogc dg) @nogc {
261         foreach(uint level; 0..numLevels) {
262             foreach(uint bin; 0..numBins) {
263                 foreach(ref T event; bins[level][bin].range()) {
264                     if( dg(event, level, bin) )
265                         return 1;
266                 }
267             }
268         }
269 
270         return 0;
271     }
272 
273 private:
274     @notrace ulong binsTillNextEntry() nothrow @safe @nogc {
275         calcNextEntryHint();
276 
277         return nextEntryHint;
278     }
279 
280     static ulong rawBinsInBin(uint level) pure nothrow @safe @nogc {
281         DBG_ASSERT!"Level passed is too big %s<%s"(level<=numLevels, level, numLevels);
282         return numBins ^^ level;
283     }
284 
285     static ulong binsInLevel(uint level) pure nothrow @safe @nogc {
286         return rawBinsInBin(level+1);
287     }
288 
289     uint phaseInLevel(uint level) pure const nothrow @safe @nogc {
290         ulong levelPhase = phase / rawBinsInBin(level);
291 
292         return levelPhase % numBins;
293     }
294 
295     void calcNextEntryHint() nothrow @safe @nogc {
296         if( nextEntryHint!=0 )
297             return;
298 
299         ref ListType nextLevelList(uint level) pure {
300             if( level<numLevels-1 ) {
301                 return bins[level+1][phaseInLevel(level+1)];
302             } else {
303                 return overflow[activeOverflow];
304             }
305         }
306 
307         foreach( uint level; 0..numLevels ) {
308             foreach( idx; phaseInLevel(level) .. numBins ) {
309                 if( !bins[level][idx].empty )
310                     return;
311 
312                 nextEntryHint += rawBinsInBin(level);
313             }
314 
315             if( !nextLevelList(level).empty ) {
316                 // The first bin of the next level is not empty. We have to unwrap it before we can figure out what's
317                 // the next event to be handled, so set the hint to point to it.
318                 continue;
319             }
320 
321             // First bin of next level is empty, but we might have entries in this level after the fold
322             ulong speculativeHintDelta;
323             foreach( idx; 0 .. phaseInLevel(level) ) {
324                 if( !bins[level][idx].empty ) {
325                     nextEntryHint += speculativeHintDelta;
326                     static if(INTERNAL_VERBOSITY)
327                         DEBUG!"XXX calculated new hint %s"(nextEntryHint);
328 
329                     return;
330                 }
331 
332                 speculativeHintDelta += rawBinsInBin(level);
333             }
334         }
335 
336         if( overflow[activeOverflow].empty ) {
337             // If we've reached here, then the entire CTQ is empty
338             nextEntryHint = ulong.max;
339         }
340 
341         static if(INTERNAL_VERBOSITY)
342             DEBUG!"XXX calculated new hint %s"(nextEntryHint);
343     }
344 
345     @notrace void advancePhase( ulong advanceCount ) nothrow @safe @nogc {
346         if( advanceCount==0 )
347             return;
348 
349         assertOp!"<="( advanceCount, nextEntryHint, "Tried to advance the phase past the next entry" );
350         uint[numLevels] oldPhases = -1;
351         oldPhases[0] = phaseInLevel(0);
352 
353         bool needCascading = oldPhases[0] + advanceCount >= numBins-1;
354         if( needCascading ) {
355             foreach( uint level; 1..numLevels ) {
356                 oldPhases[level] = phaseInLevel(level);
357             }
358         }
359 
360         phase += advanceCount;
361         if( nextEntryHint !is ulong.max )
362             nextEntryHint -= advanceCount;
363 
364         poppedTime += advanceCount * resolutionCycles;
365 
366         if( !needCascading ) {
367             // Stayed in same level. Only thing to do is update the level's end
368             endTimes[0] += advanceCount * resolutionCycles;
369             return;
370         }
371 
372         uint maxAffectedLevel;
373         ulong levelBinsToAdvance = advanceCount;
374         foreach( uint level; 0..numLevels ) {
375             maxAffectedLevel = level;
376 
377             endTimes[level] += levelBinsToAdvance * rawBinsInBin(level) * resolutionCycles;
378 
379             levelBinsToAdvance += oldPhases[level];
380             levelBinsToAdvance /= numBins;
381             if( levelBinsToAdvance==0 )
382                 break;
383 
384             ulong cyclesAdvanced = levelBinsToAdvance * binsInLevel(level) * resolutionCycles;
385             baseTimes[level] += cyclesAdvanced;
386             DBG_ASSERT!"Level %s end level time %s does not match start level time %s (should be %s)"(
387                 (endTimes[level].cycles - baseTimes[level].cycles) / (binsInLevel(level)*resolutionCycles) == 1,
388                 level, endTimes[level], baseTimes[level], baseTimes[level] + binsInLevel(level)*resolutionCycles);
389         }
390 
391         if( (phase % (numBins ^^ numLevels)) == 0 && !overflow[activeOverflow].empty ) {
392             maxAffectedLevel = numLevels;
393         }
394 
395         if( maxAffectedLevel>0 )
396             cascadeLevel( maxAffectedLevel );
397     }
398 
399     @notrace void cascadeLevel( uint maxLevel ) nothrow @safe @nogc {
400         bool firstCascaded = true;
401 
402         uint effectiveMaxLevel = min(maxLevel, numLevels-1);
403         foreach( uint level; 1..effectiveMaxLevel+1 ) {
404             // The bin to clear is the one right *before* the current one
405             uint binToClearIdx = previousBinIdx(level);
406 
407             auto binToClear = &bins[level][ binToClearIdx ];
408             if( !binToClear.empty && firstCascaded ) {
409                 firstCascaded = false;
410             }
411 
412             while( !binToClear.empty ) {
413                 auto event = binToClear.popHead();
414                 ASSERT!"During cascading, length is 0 when bins have entries"(_length>0);
415                 _length--;
416                 insert(event);
417             }
418         }
419 
420         if( maxLevel==numLevels ) {
421             // Cascade the overflow
422             auto outOverflow = &overflow[activeOverflow];
423             activeOverflow = (activeOverflow + 1) % 2; // Switch to other buffer
424             auto inOverflow = &overflow[activeOverflow];
425 
426             DBG_ASSERT!"Passive overflow not empty before switch"( inOverflow.empty );
427 
428             TscTimePoint ctqEnd = endTimes[numLevels-1];
429             while( !outOverflow.empty ) {
430                 T entry = outOverflow.popHead();
431                 if( entry.timePoint >= ctqEnd ) {
432                     inOverflow.append(entry);
433                 } else {
434                     /*
435                       We could do only this, but as most entries in the overflow will usually remain in the overflow,
436                       this if should save a few cycles.
437                     */
438                     DBG_ASSERT!"Length is 0 while moving entries from overflow"(_length>0);
439                     _length--;
440                     insert(entry);
441                 }
442             }
443         }
444 
445         DBG_ASSERT!"nextEntryHint incorrect after cascading"( firstCascaded || nextEntryHint!=ulong.max );
446     }
447 
448     uint previousBinIdx( uint level ) pure const nothrow @safe @nogc {
449         return (phaseInLevel(level) + numBins - 1) % numBins;
450     }
451 
452     @notrace void notPure() const nothrow @trusted @nogc {
453         if( numBins==0 ) // Which it never is
454             notPureDMDBUG++; // Cannot be pure.
455     }
456 }
457 
458 version(unittest):
459 private:
460 bool popOne(T, size_t numBins, size_t numLevels, bool hasOwner)
461     (ref CascadingTimeQueue!(T, numBins, numLevels, hasOwner) ctq, TscTimePoint now, void delegate(T) validate = null)
462 {
463     T entry;
464     bool foundSomething;
465 
466     while( (entry = ctq.pop(now)) !is null ) {
467         if( validate !is null )
468             validate(entry);
469 
470         // Make sure the time is correct
471         assert( entry.timePoint>TscTimePoint(now.cycles - ctq.resolutionCycles) &&
472                 entry.timePoint<TscTimePoint(now.cycles+ctq.resolutionCycles) );
473 
474         foundSomething = true;
475     }
476 
477     return foundSomething;
478 }
479 
480 void wait(CTQ)(ref CTQ ctq, ref TscTimePoint now) {
481     auto oldNow = now;
482     auto delta = ctq.cyclesTillNextEntry(now);
483     now = TscTimePoint( now.cycles + delta );
484     DEBUG!"Advanced from %s to %s (delta %s)"(oldNow, now, delta);
485     assert(delta>0);
486 }
487 
488 unittest {
489     META!"UT test time queue's levels test 1"();
490     static struct Entry {
491         TscTimePoint timePoint;
492         Entry* _next;
493         Entry* _prev;
494     }
495 
496     enum resolution = 10;
497     enum numBins = 4;
498     enum numLevels = 3;
499     CascadingTimeQueue!(Entry*, numBins, numLevels) ctq;
500     ctq.open(resolution, TscTimePoint(0));
501 
502     Entry[] entries;
503     entries ~= Entry(TscTimePoint(30));
504     entries ~= Entry(TscTimePoint(0));
505     entries ~= Entry(TscTimePoint(41));
506     entries ~= Entry(TscTimePoint(70));
507     entries ~= Entry(TscTimePoint(71));
508     entries ~= Entry(TscTimePoint(110));
509     entries ~= Entry(TscTimePoint(111));
510     entries ~= Entry(TscTimePoint(150));
511     entries ~= Entry(TscTimePoint(151));
512     entries ~= Entry(TscTimePoint(190));
513     entries ~= Entry(TscTimePoint(191));
514     entries ~= Entry(TscTimePoint(350));
515     entries ~= Entry(TscTimePoint(351));
516     entries ~= Entry(TscTimePoint(510));
517     entries ~= Entry(TscTimePoint(511));
518     entries ~= Entry(TscTimePoint(643));
519     entries ~= Entry(TscTimePoint(670));
520     entries ~= Entry(TscTimePoint(671));
521     entries ~= Entry(TscTimePoint(830));
522 
523     foreach( ref entry; entries ) {
524         ctq.insert( &entry );
525     }
526 
527     foreach( Entry* entry, uint level, uint bin; ctq ) {
528         DEBUG!"Entry %s level %s bin %s"(*entry, level, bin);
529     }
530 
531     assertEQ(entries.length, ctq.length);
532 
533     auto now = TscTimePoint(0);
534     while( true ) {
535         long wait = ctq.cyclesTillNextEntry(now);
536         DEBUG!"now %s cycles to wait %s"( now, wait );
537         if( wait == ulong.max )
538             break;
539 
540         Entry* entry = ctq.pop(now);
541         if( wait!=0 )
542             assert( entry is null );
543         else {
544             if( entry !is null )
545                 DEBUG!"Extracted entry %s from queue"( entry.timePoint );
546             else
547                 INFO!"Non-zero wait but pop returned nothing"();
548         }
549 
550         now += wait;
551     }
552 
553     assertEQ(0, ctq.length);
554 }
555 
556 unittest {
557     META!"UT test time queue's levels test 2"();
558     import std.stdio;
559     import std..string;
560     import std.algorithm: count, map;
561     import std.array;
562 
563     static struct Entry {
564         TscTimePoint timePoint;
565         string name;
566         Entry* _next;
567         Entry* _prev;
568     }
569 
570     enum resolution = 50;
571     enum numBins = 16;
572     enum numLevels = 3;
573     CascadingTimeQueue!(Entry*, numBins, numLevels) ctq;
574     ctq.open(resolution, TscTimePoint(0));
575     scope(success) ctq.close();
576     static assert (ctq.spanInBins == (numBins-1) * 16^^2 + 1);
577 
578     bool[Entry*] entries;
579     Entry* insert(TscTimePoint tp, string name) {
580         Entry* e = new Entry(tp, name);
581         ctq.insert(e);
582         entries[e] = true;
583         return e;
584     }
585 
586     insert(90.TscTimePoint, "e1");
587     insert(120.TscTimePoint, "e2");
588     insert(130.TscTimePoint, "e3");
589     insert(160.TscTimePoint, "e4");
590     insert(TscTimePoint(resolution*numBins-1), "e5");
591     insert(TscTimePoint(resolution*numBins + 10), "e6");
592 
593     long then = 0;
594     foreach(long now; [10, 50, 80, 95, 100, 120, 170, 190, 210, 290, resolution*numBins, resolution*(numBins+1), resolution*(numBins+1)+1]) {
595         Entry* e;
596         while ((e = ctq.pop(TscTimePoint(now))) !is null) {
597             scope(failure) writefln("%s:%s (%s..%s, %s)", e.name, e.timePoint, then, now, ctq.baseTimes[0]);
598             assert (e.timePoint.cycles/resolution <= now/resolution, "tp=%s then=%s now=%s".format(e.timePoint, then, now));
599             assert (e.timePoint.cycles/resolution >= then/resolution - 1, "tp=%s then=%s now=%s".format(e.timePoint, then, now));
600             assert (e in entries);
601             entries.remove(e);
602         }
603         then = now;
604     }
605     assert (entries.length == 0, "Entries not empty: %s".format(entries));
606 
607     auto e7 = insert(ctq.baseTimes[0] + resolution * (ctq.spanInBins), "e7");
608 
609     insert(ctq.baseTimes[0] + resolution * (ctq.spanInBins + ctq.binsInLevel(2)), "e8");
610     assert (ctq.stats[numLevels] == 1);
611 
612     auto e = ctq.pop(e7.timePoint + resolution);
613     assert (e is e7, "%s".format(e));
614 }
615 
616 unittest {
617     META!"UT test time queue with captured values"();
618     import std.stdio;
619     import std..string;
620     import mecca.containers.pools;
621     import std.algorithm: min;
622     import std.random;
623 
624     static struct Entry {
625         TscTimePoint timePoint;
626         ulong counter;
627         Entry* _next;
628         Entry* _prev;
629     }
630 
631     // must set these for the UT to be reproducible
632     const t0 = TscTimePoint(168513482286);
633     const cyclesPerSecond = 2208014020;
634     const cyclesPerUsec = cyclesPerSecond / 1_000_000;
635     long toCycles(Duration dur) {
636         enum HECTONANO = 10_000_000;
637         long hns = dur.total!"hnsecs";
638         return (hns / HECTONANO) * cyclesPerSecond + ((hns % HECTONANO) * cyclesPerSecond) / HECTONANO;
639     }
640 
641     void testCTQ(size_t numBins, size_t numLevels, size_t numElems)(Duration resolutionDur) {
642         FixedPool!(Entry, numElems) pool;
643         CascadingTimeQueue!(Entry*, numBins, numLevels) ctq;
644 
645         TscTimePoint now = t0;
646         long totalInserted = 0;
647         long totalPopped = 0;
648         long iterationCounter = 0;
649         auto span = resolutionDur * ctq.spanInBins;
650         auto end = t0 + toCycles(span * 2);
651         long before = toCycles(10.msecs);
652         long ahead = toCycles(span/2);
653 
654         pool.open();
655         ctq.open(toCycles(resolutionDur), t0);
656 
657         //uint seed = 3594633224; //1337;
658         uint seed = unpredictableSeed();
659         auto rand = Random(seed);
660         scope(failure) writefln("seed=%s numBins=%s numLevels=%s resDur=%s iterationCounter=%s totalInserted=%s " ~
661             "totalPopped=%s t0=%s now=%s", seed, numBins, numLevels, resolutionDur, iterationCounter, totalInserted,
662                 totalPopped, t0, now);
663 
664         void popReady(long advanceCycles) {
665             auto prevNow = now;
666             now += advanceCycles;
667             uint numPopped = 0;
668             Entry* e;
669             while ((e = ctq.pop(now)) !is null) {
670                 assert (e.timePoint <= now, "tp=%s prevNow=%s now=%s".format(e.timePoint, prevNow, now));
671                 //assert (e.timePoint/ctq.baseFrequencyCyclesDenom >= prevNow/ctq.baseFrequencyCyclesDenom, "tp=%s prevNow=%s now=%s".format(e.timePoint, prevNow, now));
672                 numPopped++;
673                 pool.release(e);
674             }
675             //writefln("%8d..%8d: %s", (prevNow - t0) / cyclesPerUsec, (now - t0) / cyclesPerUsec, numPopped);
676             totalPopped += numPopped;
677         }
678 
679         while (now < end) {
680             while (pool.numAvailable > 0) {
681                 auto e = pool.alloc();
682                 e.timePoint = TscTimePoint(uniform(now.cycles - before, min(end.cycles, now.cycles + ahead), rand));
683                 e.counter = totalInserted++;
684                 //writefln("insert[%s] at %s", e.counter, (e.timePoint - t0) / cyclesPerUsec);
685                 ctq.insert(e);
686             }
687             auto us = uniform(0, 130, rand);
688             if (us > 120) {
689                 us = uniform(100, 1500, rand);
690             }
691             popReady(us * cyclesPerUsec);
692             iterationCounter++;
693         }
694         popReady(ahead + ctq.resolutionCycles);
695         auto covered = ctq.baseTimes[0].diff!"cycles"(t0) / double(cyclesPerSecond);
696         auto expectedCover = span.total!"msecs" * (2.5 / 1000);
697         assert (covered >= expectedCover - 2, "%s %s".format(covered, expectedCover));
698 
699         writeln(totalInserted, " ", totalPopped, " ", ctq.stats);
700         foreach(i, s; ctq.stats[0..numLevels]) {
701             assert (s > 0, "level %s never received events".format(i));
702         }
703 
704         assert (totalInserted - totalPopped == pool.numInUse, "(1) pool.used=%s inserted=%s popped=%s".format(pool.numInUse, totalInserted, totalPopped));
705         assert (totalInserted == totalPopped, "(2) pool.used=%s inserted=%s popped=%s".format(pool.numInUse, totalInserted, totalPopped));
706         assert (totalInserted > numElems * 2, "totalInserted=%s".format(totalInserted));
707     }
708 
709     int numRuns = 0;
710     foreach(numElems; [10_000 /+, 300, 1000, 4000, 5000+/]) {
711         // spans 878s
712         testCTQ!(256, 3, 10_000)(50.usecs);
713         numRuns++;
714     }
715     assert (numRuns > 0);
716 }
717 
718 unittest {
719     META!"UT testing time queue with actual times"();
720     import mecca.log;
721     import std..string;
722 
723     static struct Entry {
724         TscTimePoint timePoint;
725         string name;
726         Entry* _next;
727         Entry* _prev;
728     }
729 
730     // The actual resolution is going to be slightly different than this, but it should only be more accurate, never less.
731     enum resolution = dur!"msecs"(1);
732     enum numBins = 16;
733     enum numLevels = 3;
734 
735     TscTimePoint now = TscTimePoint.hardNow;
736 
737     CascadingTimeQueue!(Entry*, numBins, numLevels) ctq;
738     ctq.open(resolution, now);
739 
740     Entry[6] entries;
741     enum L0Duration = resolution * numBins;
742     enum L1Duration = L0Duration * numBins;
743 
744     entries[0] = Entry(now + resolution /2, "e0 l0 b1");
745     entries[1] = Entry(now + resolution + 2, "e1 l0 b2");
746     entries[2] = Entry(now + resolution * 10 + 3, "e2 l0 b11");
747     entries[3] = Entry(now + L0Duration + resolution * 3 + 2, "e3 l1 b0 l0 b4");
748     entries[4] = Entry(now + L0Duration*7 + resolution * 5 + 7, "e4 l1 b6 l0 b6");
749     entries[5] = Entry(now + L1Duration + 14, "e5 l2 b0 l0 b1");
750 
751     foreach( ref e; entries ) {
752         INFO!"Entry %s at %s"( e.name, (e.timePoint - now).toString );
753     }
754 
755     // Insert out of order
756     ctq.insert(&entries[3]);
757     ctq.insert(&entries[1]);
758     ctq.insert(&entries[4]);
759     ctq.insert(&entries[5]);
760     ctq.insert(&entries[2]);
761     ctq.insert(&entries[0]);
762 
763     uint nextIdx = 0;
764 
765     assert(ctq.pop(now) is null, "First entry received too soon");
766 
767     auto base = now;
768     while(nextIdx<entries.length) {
769         auto step = ctq.cyclesTillNextEntry(now);
770         now += step;
771         DEBUG!"Setting time forward by %s to %s"(TscTimePoint.toDuration(step).toString, (now - base).toString);
772         Entry* e = ctq.pop(now);
773 
774         if( e !is null ) {
775             INFO!"Got entry %s from queue"(e.name);
776             assert( e.name == entries[nextIdx].name, "Pop returned incorrect entry, expected %s, got %s".format(entries[nextIdx].name,
777                         e.name) );
778             assert( e.timePoint>(now - resolution) && e.timePoint<=now,
779                     "Pop returned entry %s at an incorrect time. Current %s expected %s".format(e.name, now-base, e.timePoint-base) );
780             nextIdx++;
781         } else {
782             DEBUG!"Got empty entry from queue"();
783         }
784     }
785 }
786 
787 /+
788 unittest {
789     META!"UT testing run with a reactor"();
790     import mecca.reactor;
791     Reactor.OpenOptions options;
792     options.timerGranularity = 100.usecs;
793 
794     int numRuns;
795 
796     FiberHandle handle;
797 
798     void looper() {
799         DEBUG!"Num runs %s"(numRuns);
800         if( ++numRuns > 100 )
801             theReactor.resumeFiber(handle);
802     }
803 
804     void testBody() {
805         handle = theReactor.currentFiberHandle();
806 
807         theReactor.registerRecurringTimer(dur!"seconds"(1), &looper);
808 
809         theReactor.suspendCurrentFiber();
810     }
811 
812     testWithReactor(&testBody, options);
813 
814     DEBUG!"Total runs %s"(numRuns);
815 }
816 +/
817 
818 unittest {
819     META!"UT testing random time queue operations"();
820     // Test the cascading
821     import std.random;
822 
823     Mt19937 random;
824     auto seed = unpredictableSeed;
825     random.seed(seed);
826     scope(failure) INFO!"Running with seed %s"(seed);
827 
828     enum long resolution = 4;
829     enum numBins = 4;
830     enum numLevels = 4;
831 
832     struct Entry {
833         TscTimePoint timePoint;
834         uint id;
835         Entry* _next;
836         Entry* _prev;
837     }
838 
839     TscTimePoint[uint] entries;
840     uint nextId;
841 
842     CascadingTimeQueue!(Entry*, numBins, numLevels) ctq;
843     auto now = TscTimePoint(0);
844     ctq.open(resolution, now);
845     size_t numEntriesInQueue;
846 
847     void insert() {
848         Entry* entry;
849         entry = new Entry;
850 
851         uint level = uniform(0, numLevels-1, random);
852         ulong range = 0;
853         foreach( l; 0..level+1 ) {
854             range = resolution * ctq.binsInLevel(level);
855         }
856 
857         entry.timePoint = TscTimePoint( now.cycles + uniform(0, range, random) );
858         entry.id = nextId++;
859 
860         entries[entry.id] = entry.timePoint;
861 
862         DEBUG!"Pushing entry %s timepoint %s"(entry.id, entry.timePoint);
863         ctq.insert(entry);
864         numEntriesInQueue++;
865         assertEQ( ctq.length, numEntriesInQueue );
866     }
867 
868     void popAll() {
869         Entry* entry;
870         while( (entry = ctq.pop(now)) !is null ) {
871             DEBUG!"At %s popped entry %s timepoint %s"(now, entry.id, entry.timePoint);
872             numEntriesInQueue--;
873             assertEQ( numEntriesInQueue, ctq.length );
874             assert( entries[entry.id] == entry.timePoint );
875             entries.remove(entry.id);
876 
877             // Make sure the time is correct
878             ASSERT!"entry %s popped at incorrect time. %s<%s<%s"(
879                     entry.timePoint>TscTimePoint(now.cycles - resolution) &&
880                     entry.timePoint<TscTimePoint(now.cycles+resolution),
881                     entry.id,
882                     TscTimePoint(now.cycles - resolution),
883                     entry.timePoint,
884                     TscTimePoint(now.cycles + resolution));
885         }
886     }
887 
888     foreach(i; 0..10) {
889         insert();
890     }
891 
892     while( entries.length > 0 ) {
893         popAll();
894 
895         if( nextId<1000 ) {
896             insert();
897             insert();
898         }
899         auto oldNow = now;
900         now = TscTimePoint( now.cycles + ctq.cyclesTillNextEntry(now) );
901         DEBUG!"Advanced from %s to %s phase %s(%s)"(oldNow, now, ctq.phaseInLevel(0), ctq.phase);
902     }
903 
904     assertEQ( ctq.length, 0 );
905 }
906 
907 unittest {
908     // Expose a specific problem not visible under random testing
909     META!"UT for specific problems found"();
910 
911     enum long resolution = 10;
912     enum numBins = 4;
913     enum numLevels = 3;
914 
915     struct Entry {
916         TscTimePoint timePoint;
917         uint id;
918         Entry* _next;
919         Entry* _prev;
920     }
921 
922     TscTimePoint[uint] entries;
923     uint nextId;
924 
925     CascadingTimeQueue!(Entry*, numBins, numLevels) ctq;
926     auto now = TscTimePoint(0);
927     ctq.open(resolution, now);
928 
929     void insert(TscTimePoint time) {
930         Entry* entry;
931         entry = new Entry;
932 
933         entry.timePoint = time;
934         entry.id = nextId++;
935 
936         entries[entry.id] = entry.timePoint;
937 
938         DEBUG!"Pushing entry %s timepoint %s"(entry.id, entry.timePoint);
939         ctq.insert(entry);
940     }
941 
942     void validate(Entry* entry) {
943         DEBUG!"At %s popped entry %s timepoint %s"(now, entry.id, entry.timePoint);
944         assert( entries[entry.id] == entry.timePoint );
945         entries.remove(entry.id);
946     }
947 
948     bool popOne() {
949         return .popOne(ctq, now, &validate);
950     }
951 
952     void popAll() {
953         while( !popOne() ) {
954             wait(ctq, now);
955         }
956     }
957 
958     // insert a point that goes in the first bin of the second level
959     insert( TscTimePoint(9) );
960     popOne();
961     //now += 9; // Give it a time point that is almost, but not quite, to the next bucket
962     Entry* tmpRes = ctq.pop(now); // This should not return anything
963     assert( tmpRes is null );
964 
965     /* Now insert something that is distant enough from our current time to be in the first bucket of the second level,
966        but from the starting point should go into the second bucket of the second level.
967      */
968     insert( TscTimePoint(91) );
969     insert( TscTimePoint(99) );
970     popOne();
971     popOne();
972 }
973 
974 unittest {
975     META!"UT for overflow support"();
976 
977     enum long resolution = 4;
978     enum numBins = 4;
979     enum numLevels = 2;
980 
981     struct Entry {
982         TscTimePoint timePoint;
983         Entry* _next;
984         Entry* _prev;
985     }
986 
987     Entry[] entries;
988 
989     entries.reserve(10);
990     CascadingTimeQueue!(Entry*, numBins, numLevels) ctq;
991     auto now = TscTimePoint(0);
992     ctq.open(resolution, now);
993     scope(exit) ctq.close();
994 
995     void insert(TscTimePoint point) {
996         entries ~= Entry(point);
997         ctq.insert(&entries[$-1]);
998     }
999 
1000     void popOne() {
1001         while( !.popOne(ctq, now) ) {
1002             wait(ctq, now);
1003         }
1004     }
1005 
1006     insert(TscTimePoint(74)); // Level 1 bin 3 => L0 b3
1007     insert(TscTimePoint(101)); // Level 2 => L1 b1
1008 
1009     popOne();
1010     auto timeToWait = ctq.binsTillNextEntry();
1011     assert( timeToWait!=ulong.max, "CTQ failed to account for entry in overflow" );
1012 
1013     // After the pop pointer has moved, insert two points before and after the second point
1014     insert(TscTimePoint(90)); // L1 b0
1015     insert(TscTimePoint(120)); // L1 b2
1016 
1017     popOne();
1018     popOne();
1019     popOne();
1020 
1021     timeToWait = ctq.binsTillNextEntry();
1022     assertEQ( timeToWait, ulong.max, "Empty CTQ does not return infinite wait" );
1023 }