1 /// allows throttling the rate at which operations are done2 modulemecca.reactor.sync.throttler;
3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file5 6 importmecca.lib.division;
7 importmecca.lib.exception;
8 importmecca.lib.time;
9 importmecca.log;
10 importmecca.reactor;
11 importmecca.reactor.sync.fiber_queue;
12 13 /**
14 * Main throttler implementation
15 *
16 * Implements <a href="https://en.wikipedia.org/wiki/Token_bucket">Token Bucket</a> QoS. Tokens are deposited into a bucket at a fixed rate.
17 * Consumers request withdrawl of tokens from the bucket. If the bucket does not have enough tokens, the request to withdraw is paused until
18 * such time as the bucket, again, has enough to allow the request to move forward.
19 *
20 * The bucket size controls the burst rate, i.e. - the amount of tokens that can be withdrawn without wait after a long quiet period.
21 *
22 * The throttler is strict first-come first-serve.
23 *
24 * Params:
25 * AllowOverdraft = there are two variants to the throttler. In the first (and default) variant, the tokens ballance must be able to fully
26 * cover the current request. The second variant allows a request to proceed even if there is not enough tokens at the moment (overdraw),
27 * so long as all previous debt has been repayed.
28 */29 structThrottlerImpl(boolAllowOverdraft = false) {
30 private:
31 longtokenBallance = tokenBallance.min; // Can be negative IFF AllowOverdraft is true32 TscTimePointlastDepositTime;
33 ulongticksPerToken;
34 S64DivisorticksPerTokenDivider;
35 FiberQueuewaiters;
36 ulongburstSize;
37 ulongrequestedTokens;
38 39 public:
40 /**
41 * initialize a throttler for use.
42 *
43 * Params:
44 * tokensPerSecond = the rate at which new tokens are deposited at the bucket. Actual rate might vary slightly due to rounding errors.
45 * In general, the lower the number, the lower the error.
46 * burstSize = the maximal number of tokens that the bucket may hold. Unless overdraft is allowed, this is also the maximal amount
47 * that a single withdrawl may request.
48 * numInitialTokens = the number of tokens initially in the bucket. If unspecified, the bucket starts out as completely full.
49 */50 voidopen(size_ttokensPerSecond, ulongburstSize) nothrow @safe @nogc {
51 open(tokensPerSecond, burstSize, burstSize);
52 }
53 54 /// ditto55 voidopen(size_ttokensPerSecond, ulongburstSize, ulongnumInitialTokens) nothrow @safe @nogc {
56 ASSERT!"Throttler does not allowg overdraft but has no burst buffer to withdraw from"(burstSize>0 || AllowOverdraft);
57 ASSERT!"Can't deposit %s tokens to throttler with burst bucket of %s"58 (numInitialTokens<=cast(long)burstSize, numInitialTokens, burstSize);
59 this.burstSize = burstSize;
60 tokenBallance = numInitialTokens;
61 lastDepositTime = TscTimePoint.hardNow();
62 ticksPerToken = TscTimePoint.cyclesPerSecond / tokensPerSecond;
63 ticksPerTokenDivider = S64Divisor(ticksPerToken);
64 }
65 66 /// Closes the throtller.67 voidclose() nothrow @safe @nogc {
68 if( !isOpen )
69 return;
70 71 waiters.resumeAll();
72 lastDepositTime = TscTimePoint.min;
73 tokenBallance = tokenBallance.min;
74 }
75 76 ~this() nothrow @safe @nogc {
77 ASSERT!"open throttler destructed"( !isOpen );
78 }
79 80 /// reports whether open the throttler is open.81 @propertyboolisOpen() pureconstnothrow @safe @nogc {
82 returntokenBallance !istokenBallance.min;
83 }
84 85 /**
86 * Withdraw tokens from the bucket.
87 *
88 * Unless AllowOverdraft is true, the amount of tokens requested must be smaller than the burst size. If AllowOverdraft is false and
89 * there is insufficient ballance, or if AllowOverdraft is true but the ballance is negative, then the requester is paused until all
90 * prior reuqesters have been served $(B and) the ballance is high enough to serve the current request.
91 *
92 * Params:
93 * tokens = number of tokens to withdraw.
94 * timeout = sets a timeout for the wait.
95 *
96 * Throws:
97 * TimeoutExpired if the timeout expires.
98 *
99 * Any other exception injected to this fiber using Reactor.throwInFiber
100 */101 voidwithdraw(ulongtokens, Timeouttimeout = Timeout.infinite) @safe @nogc {
102 DBG_ASSERT!"Trying to withdraw from close throttler"(isOpen);
103 ASSERT!"Trying to withdraw %s tokens from throttler that can only hold %s"(AllowOverdraft || tokens<=burstSize, tokens, burstSize);
104 105 requestedTokens += tokens;
106 scope(exit) requestedTokens -= tokens;
107 108 if( requestedTokens > tokens ) {
109 // There are other waiters. Wait until this fiber is the next to withdraw110 waiters.suspend(timeout);
111 }
112 113 try {
114 // We are the first in line to withdraw. No matter why we exit, wake the next in line115 scope(exit) waiters.resumeOne();
116 117 deposit();
118 while( !mayWithdraw(tokens) ) {
119 theReactor.sleep( calcSleepDuration(tokens, timeout) );
120 deposit();
121 }
122 123 tokenBallance -= tokens;
124 } catch(TimeoutExpiredex) {
125 // We know we won't make it even before the timeout actually passes. We delay throwing the reactor timeout until the timeout126 // actually transpired (you never know who depends on this in some weird way), but we do release any other waiter in queue to127 // get a chance at obtaining the lock immediately.128 ERROR!"Fiber will not have enough tokens in time for timeout expirey %s. Wait until it actually expires before throwing"129 (timeout);
130 theReactor.sleep(timeout);
131 throwex;
132 }
133 }
134 135 private:
136 voiddeposit() nothrow @safe @nogc {
137 autonow = TscTimePoint.now;
138 longcyclesPassed = now.cycles - lastDepositTime.cycles;
139 longtokensEarned = cyclesPassed / ticksPerTokenDivider;
140 141 // To avoid drift, effective cycles are only those that earned us whole tokens.142 cyclesPassed = tokensEarned * ticksPerToken;
143 144 lastDepositTime += cyclesPassed;
145 tokenBallance += tokensEarned;
146 if( tokenBallance>cast(long)(burstSize) )
147 tokenBallance = burstSize;
148 }
149 150 boolmayWithdraw(ulongtokens) nothrow @safe @nogc {
151 returntokenBallance >= (AllowOverdraft ? 0 : cast(long)(tokens));
152 }
153 154 DurationcalcSleepDuration(ulongtokens, Timeouttimeout) @safe @nogc {
155 DBG_ASSERT!"calcSleepDuration called, but can withdraw right now"( !mayWithdraw(tokens) );
156 longnumMissingTokens = (AllowOverdraft ? 0 : tokens) - tokenBallance;
157 DBG_ASSERT!"negative missing %s: requested %s have %s"(numMissingTokens>0, numMissingTokens, tokens, tokenBallance);
158 159 autosleepDuration = TscTimePoint.toDuration( numMissingTokens * ticksPerToken );
160 if( TscTimePoint.now + sleepDuration > timeout.expiry )
161 throwmkEx!TimeoutExpired;
162 returnsleepDuration;
163 }
164 }
165 166 /// Standard throttler. Use this type when applicable.167 aliasThrottler = ThrottlerImpl!false;
168 /// Throttler allowing overdrawing tokens.169 aliasThrottlerOverdraft = ThrottlerImpl!true;
170 171 unittest {
172 importstd.uuid;
173 importstd..string : format;
174 175 importmecca.reactor: testWithReactor, theReactor;
176 importmecca.reactor.sync.barrier;
177 178 Throttlerbudget;
179 uintnumDone;
180 BarrierdoneEvent;
181 182 enumNETWORK_BUDGET = 12800;
183 184 voidloader(uintNUM_PAGES, uintNUM_ITERATIONS)() {
185 foreach(iteration; 0..NUM_ITERATIONS) {
186 budget.withdraw(NUM_PAGES);
187 theReactor.yield();
188 }
189 190 numDone++;
191 doneEvent.markDone();
192 }
193 194 testWithReactor({
195 autostartTime = TscTimePoint.hardNow();
196 budget.open(NETWORK_BUDGET, 256);
197 scope(exit) budget.close();
198 199 theReactor.spawnFiber(&loader!(150, 100)); // Big packets200 doneEvent.addWaiter();
201 theReactor.spawnFiber(&loader!(3, 1000)); // Small packets202 doneEvent.addWaiter();
203 theReactor.spawnFiber(&loader!(3, 1000)); // Small packets204 doneEvent.addWaiter();
205 theReactor.spawnFiber(&loader!(2, 1000)); // Small packets206 doneEvent.addWaiter();
207 theReactor.spawnFiber(&loader!(26, 100)); // Medium packets208 doneEvent.addWaiter();
209 // All together: 25,600 packets, which at 50MB/s equals two seconds210 211 doneEvent.waitAll();
212 autoendTime = TscTimePoint.hardNow();
213 214 autoduration = endTime - startTime;
215 216 DEBUG!"Test took %s"(duration.toString());
217 assert( duration>=dur!"msecs"(1900), format("Test should take no less than 1.9 seconds, took %s", duration.toString()) );
218 assert( (endTime-startTime)<dur!"msecs"(2200), format("Test should take no more than 2.2 seconds, took %s",
219 duration.toString()) );
220 });
221 }