1 /// allows throttling the rate at which operations are done
2 module mecca.reactor.sync.throttler;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import mecca.lib.division;
7 import mecca.lib.exception;
8 import mecca.lib.time;
9 import mecca.log;
10 import mecca.reactor;
11 import mecca.reactor.sync.fiber_queue;
12 
13 /**
14  * Main throttler implementation
15  *
16  * Implements <a href="https://en.wikipedia.org/wiki/Token_bucket">Token Bucket</a> QoS. Tokens are deposited into a bucket at a fixed rate.
17  * Consumers request withdrawl of tokens from the bucket. If the bucket does not have enough tokens, the request to withdraw is paused until
18  * such time as the bucket, again, has enough to allow the request to move forward.
19  *
20  * The bucket size controls the burst rate, i.e. - the amount of tokens that can be withdrawn without wait after a long quiet period.
21  *
22  * The throttler is strict first-come first-serve.
23  *
24  * Params:
25  * AllowOverdraft = there are two variants to the throttler. In the first (and default) variant, the tokens ballance must be able to fully
26  * cover the current request. The second variant allows a request to proceed even if there is not enough tokens at the moment (overdraw),
27  * so long as all previous debt has been repayed.
28  */
29 struct ThrottlerImpl(bool AllowOverdraft = false) {
30 private:
31     long tokenBallance = tokenBallance.min; // Can be negative IFF AllowOverdraft is true
32     TscTimePoint lastDepositTime;
33     ulong ticksPerToken;
34     S64Divisor ticksPerTokenDivider;
35     FiberQueue waiters;
36     ulong burstSize;
37     ulong requestedTokens;
38 
39 public:
40     /**
41      * initialize a throttler for use.
42      *
43      * Params:
44      * tokensPerSecond = the rate at which new tokens are deposited at the bucket. Actual rate might vary slightly due to rounding errors.
45      * In general, the lower the number, the lower the error.
46      * burstSize = the maximal number of tokens that the bucket may hold. Unless overdraft is allowed, this is also the maximal amount
47      * that a single withdrawl may request.
48      * numInitialTokens = the number of tokens initially in the bucket. If unspecified, the bucket starts out as completely full.
49      */
50     void open(size_t tokensPerSecond, ulong burstSize) nothrow @safe @nogc {
51         open(tokensPerSecond, burstSize, burstSize);
52     }
53 
54     /// ditto
55     void open(size_t tokensPerSecond, ulong burstSize, ulong numInitialTokens) nothrow @safe @nogc {
56         ASSERT!"Throttler does not allowg overdraft but has no burst buffer to withdraw from"(burstSize>0 || AllowOverdraft);
57         ASSERT!"Can't deposit %s tokens to throttler with burst bucket of %s"
58                 (numInitialTokens<=cast(long)burstSize, numInitialTokens, burstSize);
59         this.burstSize = burstSize;
60         tokenBallance = numInitialTokens;
61         lastDepositTime = TscTimePoint.hardNow();
62         ticksPerToken = TscTimePoint.cyclesPerSecond / tokensPerSecond;
63         ticksPerTokenDivider = S64Divisor(ticksPerToken);
64     }
65 
66     /// Closes the throtller.
67     void close() nothrow @safe @nogc {
68         if( !isOpen )
69             return;
70 
71         waiters.resumeAll();
72         lastDepositTime = TscTimePoint.min;
73         tokenBallance = tokenBallance.min;
74     }
75 
76     ~this() nothrow @safe @nogc {
77         ASSERT!"open throttler destructed"( !isOpen );
78     }
79 
80     /// reports whether open the throttler is open.
81     @property bool isOpen() pure const nothrow @safe @nogc {
82         return tokenBallance !is tokenBallance.min;
83     }
84 
85     /**
86      * Withdraw tokens from the bucket.
87      *
88      * Unless AllowOverdraft is true, the amount of tokens requested must be smaller than the burst size. If AllowOverdraft is false and
89      * there is insufficient ballance, or if AllowOverdraft is true but the ballance is negative, then the requester is paused until all
90      * prior reuqesters have been served $(B and) the ballance is high enough to serve the current request.
91      *
92      * Params:
93      * tokens = number of tokens to withdraw.
94      * timeout = sets a timeout for the wait.
95      *
96      * Throws:
97      * TimeoutExpired if the timeout expires.
98      *
99      * Any other exception injected to this fiber using Reactor.throwInFiber
100      */
101     void withdraw(ulong tokens, Timeout timeout = Timeout.infinite) @safe @nogc {
102         DBG_ASSERT!"Trying to withdraw from close throttler"(isOpen);
103         ASSERT!"Trying to withdraw %s tokens from throttler that can only hold %s"(AllowOverdraft || tokens<=burstSize, tokens, burstSize);
104 
105         requestedTokens += tokens;
106         scope(exit) requestedTokens -= tokens;
107 
108         if( requestedTokens > tokens ) {
109             // There are other waiters. Wait until this fiber is the next to withdraw
110             waiters.suspend(timeout);
111         }
112 
113         try {
114             // We are the first in line to withdraw. No matter why we exit, wake the next in line
115             scope(exit) waiters.resumeOne();
116 
117             deposit();
118             while( !mayWithdraw(tokens) ) {
119                 theReactor.sleep( calcSleepDuration(tokens, timeout) );
120                 deposit();
121             }
122 
123             tokenBallance -= tokens;
124         } catch(TimeoutExpired ex) {
125             // We know we won't make it even before the timeout actually passes. We delay throwing the reactor timeout until the timeout
126             // actually transpired (you never know who depends on this in some weird way), but we do release any other waiter in queue to
127             // get a chance at obtaining the lock immediately.
128             ERROR!"Fiber will not have enough tokens in time for timeout expirey %s. Wait until it actually expires before throwing"
129                     (timeout);
130             theReactor.sleep(timeout);
131             throw ex;
132         }
133     }
134 
135 private:
136     void deposit() nothrow @safe @nogc {
137         auto now = TscTimePoint.now;
138         long cyclesPassed = now.cycles - lastDepositTime.cycles;
139         long tokensEarned = cyclesPassed / ticksPerTokenDivider;
140 
141         // To avoid drift, effective cycles are only those that earned us whole tokens.
142         cyclesPassed = tokensEarned * ticksPerToken;
143 
144         lastDepositTime += cyclesPassed;
145         tokenBallance += tokensEarned;
146         if( tokenBallance>cast(long)(burstSize) )
147             tokenBallance = burstSize;
148     }
149 
150     bool mayWithdraw(ulong tokens) nothrow @safe @nogc {
151         return tokenBallance >= (AllowOverdraft ? 0 : cast(long)(tokens));
152     }
153 
154     Duration calcSleepDuration(ulong tokens, Timeout timeout) @safe @nogc {
155         DBG_ASSERT!"calcSleepDuration called, but can withdraw right now"( !mayWithdraw(tokens) );
156         long numMissingTokens = (AllowOverdraft ? 0 : tokens) - tokenBallance;
157         DBG_ASSERT!"negative missing %s: requested %s have %s"(numMissingTokens>0, numMissingTokens, tokens, tokenBallance);
158 
159         auto sleepDuration = TscTimePoint.toDuration( numMissingTokens * ticksPerToken );
160         if( TscTimePoint.now + sleepDuration > timeout.expiry )
161             throw mkEx!TimeoutExpired;
162         return sleepDuration;
163     }
164 }
165 
166 /// Standard throttler. Use this type when applicable.
167 alias Throttler = ThrottlerImpl!false;
168 /// Throttler allowing overdrawing tokens.
169 alias ThrottlerOverdraft = ThrottlerImpl!true;
170 
171 unittest {
172     import std.uuid;
173     import std..string : format;
174 
175     import mecca.reactor: testWithReactor, theReactor;
176     import mecca.reactor.sync.barrier;
177 
178     Throttler budget;
179     uint numDone;
180     Barrier doneEvent;
181 
182     enum NETWORK_BUDGET = 12800;
183 
184     void loader(uint NUM_PAGES, uint NUM_ITERATIONS)() {
185         foreach(iteration; 0..NUM_ITERATIONS) {
186             budget.withdraw(NUM_PAGES);
187             theReactor.yield();
188         }
189 
190         numDone++;
191         doneEvent.markDone();
192     }
193 
194     testWithReactor({
195         auto startTime = TscTimePoint.hardNow();
196         budget.open(NETWORK_BUDGET, 256);
197         scope(exit) budget.close();
198 
199         theReactor.spawnFiber(&loader!(150, 100)); // Big packets
200         doneEvent.addWaiter();
201         theReactor.spawnFiber(&loader!(3, 1000)); // Small packets
202         doneEvent.addWaiter();
203         theReactor.spawnFiber(&loader!(3, 1000)); // Small packets
204         doneEvent.addWaiter();
205         theReactor.spawnFiber(&loader!(2, 1000)); // Small packets
206         doneEvent.addWaiter();
207         theReactor.spawnFiber(&loader!(26, 100)); // Medium packets
208         doneEvent.addWaiter();
209         // All together: 25,600 packets, which at 50MB/s equals two seconds
210 
211         doneEvent.waitAll();
212         auto endTime = TscTimePoint.hardNow();
213 
214         auto duration = endTime - startTime;
215 
216         DEBUG!"Test took %s"(duration.toString());
217         assert( duration>=dur!"msecs"(1900), format("Test should take no less than 1.9 seconds, took %s", duration.toString()) );
218         assert( (endTime-startTime)<dur!"msecs"(2200), format("Test should take no more than 2.2 seconds, took %s",
219                     duration.toString()) );
220     });
221 }