1 /// allows throttling the rate at which operations are done 2 module mecca.reactor.sync.throttler; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import mecca.lib.division; 7 import mecca.lib.exception; 8 import mecca.lib.time; 9 import mecca.log; 10 import mecca.reactor; 11 import mecca.reactor.sync.fiber_queue; 12 13 /** 14 * Main throttler implementation 15 * 16 * Implements <a href="https://en.wikipedia.org/wiki/Token_bucket">Token Bucket</a> QoS. Tokens are deposited into a bucket at a fixed rate. 17 * Consumers request withdrawl of tokens from the bucket. If the bucket does not have enough tokens, the request to withdraw is paused until 18 * such time as the bucket, again, has enough to allow the request to move forward. 19 * 20 * The bucket size controls the burst rate, i.e. - the amount of tokens that can be withdrawn without wait after a long quiet period. 21 * 22 * The throttler is strict first-come first-serve. 23 * 24 * Params: 25 * AllowOverdraft = there are two variants to the throttler. In the first (and default) variant, the tokens ballance must be able to fully 26 * cover the current request. The second variant allows a request to proceed even if there is not enough tokens at the moment (overdraw), 27 * so long as all previous debt has been repayed. 28 */ 29 struct ThrottlerImpl(bool AllowOverdraft = false) { 30 private: 31 long tokenBallance = tokenBallance.min; // Can be negative IFF AllowOverdraft is true 32 TscTimePoint lastDepositTime; 33 ulong ticksPerToken; 34 S64Divisor ticksPerTokenDivider; 35 FiberQueue waiters; 36 ulong burstSize; 37 ulong requestedTokens; 38 39 public: 40 /** 41 * initialize a throttler for use. 42 * 43 * Params: 44 * tokensPerSecond = the rate at which new tokens are deposited at the bucket. Actual rate might vary slightly due to rounding errors. 45 * In general, the lower the number, the lower the error. 46 * burstSize = the maximal number of tokens that the bucket may hold. Unless overdraft is allowed, this is also the maximal amount 47 * that a single withdrawl may request. 48 * numInitialTokens = the number of tokens initially in the bucket. If unspecified, the bucket starts out as completely full. 49 */ 50 void open(size_t tokensPerSecond, ulong burstSize) nothrow @safe @nogc { 51 open(tokensPerSecond, burstSize, burstSize); 52 } 53 54 /// ditto 55 void open(size_t tokensPerSecond, ulong burstSize, ulong numInitialTokens) nothrow @safe @nogc { 56 ASSERT!"Throttler does not allowg overdraft but has no burst buffer to withdraw from"(burstSize>0 || AllowOverdraft); 57 ASSERT!"Can't deposit %s tokens to throttler with burst bucket of %s" 58 (numInitialTokens<=cast(long)burstSize, numInitialTokens, burstSize); 59 this.burstSize = burstSize; 60 tokenBallance = numInitialTokens; 61 lastDepositTime = TscTimePoint.hardNow(); 62 ticksPerToken = TscTimePoint.cyclesPerSecond / tokensPerSecond; 63 ticksPerTokenDivider = S64Divisor(ticksPerToken); 64 } 65 66 /// Closes the throtller. 67 void close() nothrow @safe @nogc { 68 if( !isOpen ) 69 return; 70 71 waiters.resumeAll(); 72 lastDepositTime = TscTimePoint.min; 73 tokenBallance = tokenBallance.min; 74 } 75 76 ~this() nothrow @safe @nogc { 77 ASSERT!"open throttler destructed"( !isOpen ); 78 } 79 80 /// reports whether open the throttler is open. 81 @property bool isOpen() pure const nothrow @safe @nogc { 82 return tokenBallance !is tokenBallance.min; 83 } 84 85 /** 86 * Withdraw tokens from the bucket. 87 * 88 * Unless AllowOverdraft is true, the amount of tokens requested must be smaller than the burst size. If AllowOverdraft is false and 89 * there is insufficient ballance, or if AllowOverdraft is true but the ballance is negative, then the requester is paused until all 90 * prior reuqesters have been served $(B and) the ballance is high enough to serve the current request. 91 * 92 * Params: 93 * tokens = number of tokens to withdraw. 94 * timeout = sets a timeout for the wait. 95 * 96 * Throws: 97 * TimeoutExpired if the timeout expires. 98 * 99 * Any other exception injected to this fiber using Reactor.throwInFiber 100 */ 101 void withdraw(ulong tokens, Timeout timeout = Timeout.infinite) @safe @nogc { 102 DBG_ASSERT!"Trying to withdraw from close throttler"(isOpen); 103 ASSERT!"Trying to withdraw %s tokens from throttler that can only hold %s"(AllowOverdraft || tokens<=burstSize, tokens, burstSize); 104 105 requestedTokens += tokens; 106 scope(exit) requestedTokens -= tokens; 107 108 if( requestedTokens > tokens ) { 109 // There are other waiters. Wait until this fiber is the next to withdraw 110 waiters.suspend(timeout); 111 } 112 113 try { 114 // We are the first in line to withdraw. No matter why we exit, wake the next in line 115 scope(exit) waiters.resumeOne(); 116 117 deposit(); 118 while( !mayWithdraw(tokens) ) { 119 theReactor.sleep( calcSleepDuration(tokens, timeout) ); 120 deposit(); 121 } 122 123 tokenBallance -= tokens; 124 } catch(TimeoutExpired ex) { 125 // We know we won't make it even before the timeout actually passes. We delay throwing the reactor timeout until the timeout 126 // actually transpired (you never know who depends on this in some weird way), but we do release any other waiter in queue to 127 // get a chance at obtaining the lock immediately. 128 ERROR!"Fiber will not have enough tokens in time for timeout expirey %s. Wait until it actually expires before throwing" 129 (timeout); 130 theReactor.sleep(timeout); 131 throw ex; 132 } 133 } 134 135 private: 136 void deposit() nothrow @safe @nogc { 137 auto now = TscTimePoint.now; 138 long cyclesPassed = now.cycles - lastDepositTime.cycles; 139 long tokensEarned = cyclesPassed / ticksPerTokenDivider; 140 141 // To avoid drift, effective cycles are only those that earned us whole tokens. 142 cyclesPassed = tokensEarned * ticksPerToken; 143 144 lastDepositTime += cyclesPassed; 145 tokenBallance += tokensEarned; 146 if( tokenBallance>cast(long)(burstSize) ) 147 tokenBallance = burstSize; 148 } 149 150 bool mayWithdraw(ulong tokens) nothrow @safe @nogc { 151 return tokenBallance >= (AllowOverdraft ? 0 : cast(long)(tokens)); 152 } 153 154 Duration calcSleepDuration(ulong tokens, Timeout timeout) @safe @nogc { 155 DBG_ASSERT!"calcSleepDuration called, but can withdraw right now"( !mayWithdraw(tokens) ); 156 long numMissingTokens = (AllowOverdraft ? 0 : tokens) - tokenBallance; 157 DBG_ASSERT!"negative missing %s: requested %s have %s"(numMissingTokens>0, numMissingTokens, tokens, tokenBallance); 158 159 auto sleepDuration = TscTimePoint.toDuration( numMissingTokens * ticksPerToken ); 160 if( TscTimePoint.now + sleepDuration > timeout.expiry ) 161 throw mkEx!TimeoutExpired; 162 return sleepDuration; 163 } 164 } 165 166 /// Standard throttler. Use this type when applicable. 167 alias Throttler = ThrottlerImpl!false; 168 /// Throttler allowing overdrawing tokens. 169 alias ThrottlerOverdraft = ThrottlerImpl!true; 170 171 unittest { 172 import std.uuid; 173 import std..string : format; 174 175 import mecca.reactor: testWithReactor, theReactor; 176 import mecca.reactor.sync.barrier; 177 178 Throttler budget; 179 uint numDone; 180 Barrier doneEvent; 181 182 enum NETWORK_BUDGET = 12800; 183 184 void loader(uint NUM_PAGES, uint NUM_ITERATIONS)() { 185 foreach(iteration; 0..NUM_ITERATIONS) { 186 budget.withdraw(NUM_PAGES); 187 theReactor.yield(); 188 } 189 190 numDone++; 191 doneEvent.markDone(); 192 } 193 194 testWithReactor({ 195 auto startTime = TscTimePoint.hardNow(); 196 budget.open(NETWORK_BUDGET, 256); 197 scope(exit) budget.close(); 198 199 theReactor.spawnFiber(&loader!(150, 100)); // Big packets 200 doneEvent.addWaiter(); 201 theReactor.spawnFiber(&loader!(3, 1000)); // Small packets 202 doneEvent.addWaiter(); 203 theReactor.spawnFiber(&loader!(3, 1000)); // Small packets 204 doneEvent.addWaiter(); 205 theReactor.spawnFiber(&loader!(2, 1000)); // Small packets 206 doneEvent.addWaiter(); 207 theReactor.spawnFiber(&loader!(26, 100)); // Medium packets 208 doneEvent.addWaiter(); 209 // All together: 25,600 packets, which at 50MB/s equals two seconds 210 211 doneEvent.waitAll(); 212 auto endTime = TscTimePoint.hardNow(); 213 214 auto duration = endTime - startTime; 215 216 DEBUG!"Test took %s"(duration.toString()); 217 assert( duration>=dur!"msecs"(1900), format("Test should take no less than 1.9 seconds, took %s", duration.toString()) ); 218 assert( (endTime-startTime)<dur!"msecs"(2200), format("Test should take no more than 2.2 seconds, took %s", 219 duration.toString()) ); 220 }); 221 }