1 /// Reactor aware semaphore
2 module mecca.reactor.sync.semaphore;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import std.algorithm;
7 
8 import mecca.lib.exception;
9 import mecca.lib.time;
10 import mecca.log;
11 import mecca.reactor;
12 import mecca.reactor.sync.fiber_queue;
13 
14 /// Reactor aware semaphore
15 struct Semaphore {
16 private:
17     /+ Semaphore fairness is assured using a two stage process.
18        During acquire, if there are other waiting acquirers, we place ourselves in the "waiters" queue.
19 
20        Once we're at the top of the queue, we register our fiber handle as primaryWaiter.
21      +/
22     size_t _capacity, pendingCapacity;
23     long available;     // May be negative due to capacity change
24     size_t requestsPending;
25     FiberQueue waiters;
26     FiberHandle primaryWaiter;
27     bool resumePending;         // Makes sure only one fiber gets woken up at any one time
28 
29 public:
30     @disable this(this);
31 
32     /**
33      * Construct a semaphore with given capacity.
34      */
35     this(size_t capacity, size_t used = 0) pure nothrow @safe @nogc {
36         open(capacity, used);
37     }
38 
39     /**
40      * Call this function before using the semaphore.
41      *
42      * You can skip calling `open` if the semaphore is constructed explicitly
43      *
44      * Params:
45      *  capacity = maximal value the semaphore can grow to.
46      *  used = initial number of obtained locks. Default of zero means that the semaphore is currently unused.
47      */
48     void open(size_t capacity, size_t used = 0) pure nothrow @safe @nogc {
49         ASSERT!"Semaphore.open called on already open semaphore"(_capacity==0);
50         ASSERT!"Semaphore.open called with capacity 0"(capacity>0);
51         ASSERT!"Semaphore.open called with initial used count %s greater than capacity %s"(used<=capacity, used, capacity);
52 
53         _capacity = capacity;
54         available = _capacity - used;
55         pendingCapacity = 0;
56         requestsPending = 0;
57         ASSERT!"open called with waiters in queue"( waiters.empty );
58     }
59 
60     /**
61      * Call this function when the semaphore is no longer needed.
62      *
63      * This function is mostly useful for unittests, where the same semaphore instance might be used multiple times.
64      */
65     void close() pure nothrow @safe @nogc {
66         ASSERT!"Semaphore.close called on a non-open semaphore"(_capacity > 0);
67         ASSERT!"Semaphore.close called while fibers are waiting on semaphore"(waiters.empty);
68         _capacity = 0;
69     }
70 
71     /// Report the capacity of the semaphore
72     @property size_t capacity() const pure nothrow @safe @nogc {
73         return _capacity;
74     }
75 
76     /** Change the capacity of the semaphore
77      *
78      * If `immediate` is set to `false` and the current `level` is higher than the requested capacity, `setCapacity`
79      * will sleep until the capacity can be cleared.
80      *
81      * If `immediate` is `false` then `setCapacity` may return before the new capacity is actually set. This will
82      * $(I only) happen if there is an older `setCapacity` call that has not yet finished.
83      *
84      * Params:
85      * newCapacity = The new capacity
86      * immediate = whether the new capacity takes effect immediately.
87      *
88      * Warnings:
89      * Setting the capacity to lower than the number of resources a waiting fiber is currently requesting is undefined.
90      *
91      * If `immediate` is set to `true`, it is possible for `level` to report a higher acquired count than `capacity`.
92      *
93      * If there is a chance that multiple calls to `setCapacity` are active at once, the `immeditate` flag must be set
94      * the same way on all of them. In other words, it is illegal to call `setCapacity` with `immediate` set to `false`,
95      * and then call `setCapacity` with `immediate` set to true before the first call returns.
96      */
97     void setCapacity(size_t newCapacity, bool immediate = false) @safe @nogc {
98         DBG_ASSERT!"setCapacity called with immediate set and a previous call still pending"(
99                 !immediate || pendingCapacity == 0 );
100 
101         if( (newCapacity>=_capacity && pendingCapacity==0) || immediate ) {
102             // Fast path
103             available += newCapacity - _capacity;
104             _capacity = newCapacity;
105 
106             // It is possible that a fiber that previously was blocked can now move forward
107             resumeOne(false);
108 
109             return;
110         }
111 
112         // We cannot complete the capacity change immediately
113         if( pendingCapacity!=0 ) {
114             // Just piggyback the pending capacity change
115             pendingCapacity = newCapacity;
116             return;
117         }
118 
119         // We need to wait ourselves for the change to be possible
120         pendingCapacity = newCapacity;
121         scope(exit) pendingCapacity = 0;
122 
123         while( pendingCapacity!=this.capacity ) {
124             assertEQ( newCapacity, pendingCapacity );
125 
126             // If we ask to reduce the capacity, and then reduce it again, we might need to run this loop more than once
127             assertLT( pendingCapacity, this.capacity, "pendingCapacity not <= level" );
128             size_t numAcquired = this.capacity - pendingCapacity;
129             acquire(numAcquired);
130             // It is never released as such. We instead manipulate the state accordingly
131 
132             // Calculate the capacity closest to the one we want we can saftly reach right now
133             newCapacity = max(newCapacity, pendingCapacity);
134 
135             _capacity = newCapacity;
136             if( newCapacity>this.capacity ) {
137                 // We need to increase the capcity.
138                 release(numAcquired);
139                 // This also releases any further clients waiting
140             } else {
141                 // available was already substracted by the acquire. Nothing more to do here
142             }
143 
144             newCapacity = pendingCapacity;
145         }
146     }
147 
148     /**
149      * Report the current amount of available resources.
150      *
151      * This amount includes all currently pending acquire requests.
152      */
153     @property size_t level() const pure nothrow @safe @nogc {
154         if( available < requestsPending )
155             return 0;
156 
157         return available - requestsPending;
158     }
159 
160     /**
161      * acquire resources from the semaphore.
162      *
163      * Acquire one or more "resources" from the semaphore. Sleep if not enough are available. The semaphore guarantees a
164      * strict FIFO. A new request, even if satifiable, will not be granted until all older requests are granted.
165      *
166      * Params:
167      *  amount = the amount of resources to request.
168      *  timeout = how long to wait for resources to become available.
169      *
170      * Throws:
171      *  TimeoutExpired if timeout has elapsed without satisfying the request.
172      *
173      *  Also, any other exception may be thrown if injected using theReactor.throwInFiber.
174      */
175     void acquire(size_t amount = 1, Timeout timeout = Timeout.infinite) @safe @nogc {
176         ASSERT!"Semaphore tried to acquire %s, but total capacity is only %s"( amount<=capacity, amount, capacity );
177         requestsPending += amount;
178         scope(exit) requestsPending -= amount;
179 
180         bool slept;
181         if( requestsPending>amount ) {
182             // There are others waiting before us
183             suspendSecondary(timeout);
184             slept = true;
185             // XXX Should we set the priority back so that all sleeps are the same priority?
186         }
187 
188         while( available<amount ) {
189             if( slept ) {
190                 DEBUG!"Spurious wakeup waiting to acquire %s from semaphore. Available %s, capacity %s"(
191                         amount, available, capacity);
192             }
193 
194             slept = true;
195             suspendPrimary(timeout);
196         }
197         // In case we didn't sleep
198         theReactor.assertMayContextSwitch();
199 
200         DBG_ASSERT!"Semaphore has %s requests pending including us, but we're requesting %s"(
201                 requestsPending >= amount, requestsPending, amount);
202 
203         available -= amount;
204         // requestsPending -= amount; Will be done by scope(exit) above. We're not sleeping until the end of the function
205 
206         if( requestsPending>amount && available>0 ) {
207             // If there are other pendings, we've slept. The same release that woke us up should have woken the next in
208             // line too, except it didn't know it released enough to wake more than one.
209             resumeOne(true);
210         }
211     }
212 
213     /**
214      * Try to acquire resources from the semaphore.
215      *
216      * Try to acquire one or more "resources" from the semaphore. To maintain strict FIFO, the acquire will fail if
217      * another request is currently pending, even if there are enough resources to satisfy both requests.
218      *
219      * returns:
220      * Returns `true` if the request was granted.
221      */
222     bool tryAcquire(size_t amount = 1) nothrow @safe @nogc {
223         if( requestsPending>0 )
224             // There are other waiters. Won't satisfy immediately
225             return false;
226 
227         if( available<amount ) {
228             return false;
229         }
230 
231         available-=amount;
232         return true;
233     }
234 
235     /**
236      * Release resources acquired via acquire.
237      */
238     void release(size_t amount = 1) nothrow @safe @nogc {
239         ASSERT!"Semaphore.release called to release 0 coins"(amount>0);
240 
241         available += amount;
242         ASSERT!"Semaphore.release(%s) called results in %s available coins but only %s capacity"(
243                 available<=capacity, amount, available, capacity );
244 
245         if( requestsPending>0 )
246             resumeOne(false);
247     }
248 
249 private:
250     void resumeOne(bool immediate) nothrow @safe @nogc {
251         if( !resumePending ) {
252             if( primaryWaiter.isValid ) {
253                 theReactor.resumeFiber(primaryWaiter, immediate);
254             } else {
255                 waiters.resumeOne(immediate);
256             }
257 
258             resumePending = true;
259         }
260     }
261 
262     void suspendSecondary(Timeout timeout) @safe @nogc {
263         waiters.suspend(timeout);
264         ASSERT!"Semaphore woke up without anyone owning up to waking us up."(resumePending);
265 
266         resumePending = false;
267     }
268 
269     void suspendPrimary(Timeout timeout) @safe @nogc {
270         scope(failure) resumeOne(false);
271 
272         DBG_ASSERT!"Cannot have two primary waiters"(!primaryWaiter.isValid);
273         primaryWaiter = theReactor.currentFiberHandle();
274         scope(exit) {
275             resumePending = false;
276             primaryWaiter.reset();
277         }
278 
279         theReactor.suspendCurrentFiber(timeout);
280         ASSERT!"Semaphore woke up without anyone owning up to waking us up."(resumePending);
281     }
282 }
283 
284 unittest {
285     import mecca.reactor;
286 
287     theReactor.setup();
288     scope(exit) theReactor.teardown();
289 
290     uint[4] counters;
291     Semaphore sem;
292     uint doneCount;
293 
294     sem.open(3);
295 
296     void func(uint cnt) {
297         sem.acquire(3);
298         theReactor.yield();
299         sem.release(3);
300 
301         foreach(i; 0..1000) {
302             sem.acquire(2);
303             counters[cnt]++;
304             sem.release(2);
305         }
306 
307         theReactor.stop();
308     }
309 
310     theReactor.spawnFiber(&func, 0);
311     theReactor.spawnFiber(&func, 1);
312     theReactor.spawnFiber(&func, 2);
313     theReactor.spawnFiber(&func, 3);
314 
315     theReactor.start();
316 
317     INFO!"Counters at end: %s"(counters);
318     foreach(i, cnt; counters) {
319         ASSERT!"Counter %s not correct: %s"(cnt>=999, i, counters);
320     }
321 }
322 
323 unittest {
324     import mecca.reactor.sync.barrier;
325 
326     uint counter;
327     auto sem = Semaphore(4);
328     Barrier barrier;
329 
330     void fib(uint expected) {
331         assert(counter==0);
332         sem.acquire(4);
333         assertEQ(counter, expected, "Out of order acquire");
334         counter++;
335         sem.release(4);
336 
337         barrier.markDone();
338     }
339 
340     testWithReactor({
341             sem.acquire(4);
342 
343             foreach(uint i; 0..10) {
344                 theReactor.spawnFiber(&fib, i);
345                 barrier.addWaiter();
346                 theReactor.yield;
347             }
348 
349             sem.release(1);
350             theReactor.yield();
351             sem.release(3);
352 
353             barrier.waitAll();
354 
355             assert(counter==10);
356         });
357 }
358 
359 unittest {
360     auto sem = Semaphore(2);
361     bool gotIt;
362 
363     void fiberFunc() {
364         sem.acquire();
365         scope(exit) sem.release();
366 
367         gotIt = true;
368 
369         theReactor.yield();
370     }
371 
372     void mainFunc() {
373         assert(sem.tryAcquire(2), "tryAcquire failed on empty semaphore");
374 
375         auto fib = theReactor.spawnFiber(&fiberFunc);
376         theReactor.yield();
377         assert(!gotIt, "semaphore acquired despite no available resources");
378 
379         sem.release(2);
380         assert(!gotIt, "release shouldn't yield");
381 
382         assert(!sem.tryAcquire(), "tryAcquire succeeded despite pending fibers");
383         theReactor.yield();
384         assert(gotIt, "Fiber didn't acquire despite release");
385         assert(fib.isValid, "Fiber quite too soon");
386         assertEQ(sem.level, 1, "Semaphore level is incorrect");
387         assert(sem.tryAcquire(), "tryAcquire failed despite available resources");
388 
389         theReactor.joinFiber(fib);
390         assertEQ(sem.level, 1, "Semaphore level is incorrect");
391     }
392 
393     testWithReactor(&mainFunc);
394 }
395 
396 unittest {
397     // WEKAPP-74912: exception injected during acquire
398     META!"Test exception injection during acquire"();
399     import mecca.reactor;
400 
401     Semaphore sem;
402     Timeout timeout;
403 
404     sem.open(1);
405 
406     void testerBody() {
407         sem.acquire(1, timeout);
408         sem.release();
409     }
410 
411     testWithReactor( {
412             timeout = Timeout(300.msecs);
413             sem.acquire(1, timeout);
414 
415             auto fh1 = theReactor.spawnFiber(&testerBody);
416             theReactor.yield();
417 
418             auto fh2 = theReactor.spawnFiber(&testerBody);
419             theReactor.yield();
420 
421             auto fh3 = theReactor.spawnFiber(&testerBody);
422             theReactor.yield();
423 
424             sem.release();
425             theReactor.throwInFiber!FiberKilled(fh1);
426             theReactor.joinFiber(fh1, timeout);
427 
428             theReactor.joinFiber(fh2, timeout);
429             theReactor.joinFiber(fh3, timeout);
430         });
431 }
432 
433 version(unittest):
434 import mecca.reactor.sync.barrier;
435 import mecca.reactor.sync.event;
436 
437 class SetCapacityTests {
438     Semaphore sem;
439     uint counter;
440     Barrier barrier;
441 
442     void open(uint capacity, uint used = 0) {
443         assertEQ(barrier.numWaiters, 0, "Barrier not clear on open");
444         sem.open(capacity, used);
445         counter = 0;
446     }
447 
448     void reset() {
449         sem.close();
450     }
451 
452     void fib(uint howMuch, Event* clearToExit) {
453         scope(exit) barrier.markDone();
454 
455         sem.acquire(howMuch);
456         scope(exit) sem.release(howMuch);
457 
458         counter++;
459         clearToExit.wait();
460     }
461 
462     FiberHandle spawn(uint howMuch, Event* clearToExit) {
463         auto fh = theReactor.spawnFiber(&fib, howMuch, clearToExit);
464         barrier.addWaiter();
465 
466         return fh;
467     }
468 
469     @mecca_ut void releaseOnCapacityIncrease() {
470         open(1);
471         scope(success) reset();
472 
473         Event clearToExit;
474 
475         spawn(1, &clearToExit);
476         spawn(1, &clearToExit);
477 
478         theReactor.yield();
479         theReactor.yield();
480         assertEQ(counter, 1, "Acquire succeeded, should have failed");
481 
482         sem.setCapacity(2);
483         theReactor.yield();
484         assertEQ(counter, 2, "Acquire failed, should have succeeded");
485 
486         clearToExit.set();
487         barrier.waitAll();
488         assertEQ(counter, 2);
489     }
490 
491     @mecca_ut void reduceCapacity() {
492         open(4);
493         scope(success) reset();
494 
495         Event blocker;
496 
497         spawn(2, &blocker);
498         spawn(2, &blocker);
499         spawn(2, &blocker);
500 
501         theReactor.yield();
502         theReactor.yield();
503         assertEQ(counter, 2);
504 
505         blocker.set();
506         sem.setCapacity(2);
507         assertEQ(counter, 3);
508 
509         blocker.reset();
510 
511         spawn(2, &blocker);
512         spawn(2, &blocker);
513         theReactor.yield();
514         assertEQ(counter, 4);
515 
516         blocker.set();
517         barrier.waitAll();
518         assertEQ(counter, 5);
519     }
520 }
521 
522 mixin TEST_FIXTURE_REACTOR!SetCapacityTests;