1 /// Reactor aware semaphore 2 module mecca.reactor.sync.semaphore; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import std.algorithm; 7 8 import mecca.lib.exception; 9 import mecca.lib.time; 10 import mecca.log; 11 import mecca.reactor; 12 import mecca.reactor.sync.fiber_queue; 13 14 /// Reactor aware semaphore 15 struct Semaphore { 16 private: 17 /+ Semaphore fairness is assured using a two stage process. 18 During acquire, if there are other waiting acquirers, we place ourselves in the "waiters" queue. 19 20 Once we're at the top of the queue, we register our fiber handle as primaryWaiter. 21 +/ 22 size_t _capacity, pendingCapacity; 23 long available; // May be negative due to capacity change 24 size_t requestsPending; 25 FiberQueue waiters; 26 FiberHandle primaryWaiter; 27 bool resumePending; // Makes sure only one fiber gets woken up at any one time 28 29 public: 30 @disable this(this); 31 32 /** 33 * Construct a semaphore with given capacity. 34 */ 35 this(size_t capacity, size_t used = 0) pure nothrow @safe @nogc { 36 open(capacity, used); 37 } 38 39 /** 40 * Call this function before using the semaphore. 41 * 42 * You can skip calling `open` if the semaphore is constructed explicitly 43 * 44 * Params: 45 * capacity = maximal value the semaphore can grow to. 46 * used = initial number of obtained locks. Default of zero means that the semaphore is currently unused. 47 */ 48 void open(size_t capacity, size_t used = 0) pure nothrow @safe @nogc { 49 ASSERT!"Semaphore.open called on already open semaphore"(_capacity==0); 50 ASSERT!"Semaphore.open called with capacity 0"(capacity>0); 51 ASSERT!"Semaphore.open called with initial used count %s greater than capacity %s"(used<=capacity, used, capacity); 52 53 _capacity = capacity; 54 available = _capacity - used; 55 pendingCapacity = 0; 56 requestsPending = 0; 57 ASSERT!"open called with waiters in queue"( waiters.empty ); 58 } 59 60 /** 61 * Call this function when the semaphore is no longer needed. 62 * 63 * This function is mostly useful for unittests, where the same semaphore instance might be used multiple times. 64 */ 65 void close() pure nothrow @safe @nogc { 66 ASSERT!"Semaphore.close called on a non-open semaphore"(_capacity > 0); 67 ASSERT!"Semaphore.close called while fibers are waiting on semaphore"(waiters.empty); 68 _capacity = 0; 69 } 70 71 /// Report the capacity of the semaphore 72 @property size_t capacity() const pure nothrow @safe @nogc { 73 return _capacity; 74 } 75 76 /** Change the capacity of the semaphore 77 * 78 * If `immediate` is set to `false` and the current `level` is higher than the requested capacity, `setCapacity` 79 * will sleep until the capacity can be cleared. 80 * 81 * If `immediate` is `false` then `setCapacity` may return before the new capacity is actually set. This will 82 * $(I only) happen if there is an older `setCapacity` call that has not yet finished. 83 * 84 * Params: 85 * newCapacity = The new capacity 86 * immediate = whether the new capacity takes effect immediately. 87 * 88 * Warnings: 89 * Setting the capacity to lower than the number of resources a waiting fiber is currently requesting is undefined. 90 * 91 * If `immediate` is set to `true`, it is possible for `level` to report a higher acquired count than `capacity`. 92 * 93 * If there is a chance that multiple calls to `setCapacity` are active at once, the `immeditate` flag must be set 94 * the same way on all of them. In other words, it is illegal to call `setCapacity` with `immediate` set to `false`, 95 * and then call `setCapacity` with `immediate` set to true before the first call returns. 96 */ 97 void setCapacity(size_t newCapacity, bool immediate = false) @safe @nogc { 98 DBG_ASSERT!"setCapacity called with immediate set and a previous call still pending"( 99 !immediate || pendingCapacity == 0 ); 100 101 if( (newCapacity>=_capacity && pendingCapacity==0) || immediate ) { 102 // Fast path 103 available += newCapacity - _capacity; 104 _capacity = newCapacity; 105 106 // It is possible that a fiber that previously was blocked can now move forward 107 resumeOne(false); 108 109 return; 110 } 111 112 // We cannot complete the capacity change immediately 113 if( pendingCapacity!=0 ) { 114 // Just piggyback the pending capacity change 115 pendingCapacity = newCapacity; 116 return; 117 } 118 119 // We need to wait ourselves for the change to be possible 120 pendingCapacity = newCapacity; 121 scope(exit) pendingCapacity = 0; 122 123 while( pendingCapacity!=this.capacity ) { 124 assertEQ( newCapacity, pendingCapacity ); 125 126 // If we ask to reduce the capacity, and then reduce it again, we might need to run this loop more than once 127 assertLT( pendingCapacity, this.capacity, "pendingCapacity not <= level" ); 128 size_t numAcquired = this.capacity - pendingCapacity; 129 acquire(numAcquired); 130 // It is never released as such. We instead manipulate the state accordingly 131 132 // Calculate the capacity closest to the one we want we can saftly reach right now 133 newCapacity = max(newCapacity, pendingCapacity); 134 135 _capacity = newCapacity; 136 if( newCapacity>this.capacity ) { 137 // We need to increase the capcity. 138 release(numAcquired); 139 // This also releases any further clients waiting 140 } else { 141 // available was already substracted by the acquire. Nothing more to do here 142 } 143 144 newCapacity = pendingCapacity; 145 } 146 } 147 148 /** 149 * Report the current amount of available resources. 150 * 151 * This amount includes all currently pending acquire requests. 152 */ 153 @property size_t level() const pure nothrow @safe @nogc { 154 if( available < requestsPending ) 155 return 0; 156 157 return available - requestsPending; 158 } 159 160 /** 161 * acquire resources from the semaphore. 162 * 163 * Acquire one or more "resources" from the semaphore. Sleep if not enough are available. The semaphore guarantees a 164 * strict FIFO. A new request, even if satifiable, will not be granted until all older requests are granted. 165 * 166 * Params: 167 * amount = the amount of resources to request. 168 * timeout = how long to wait for resources to become available. 169 * 170 * Throws: 171 * TimeoutExpired if timeout has elapsed without satisfying the request. 172 * 173 * Also, any other exception may be thrown if injected using theReactor.throwInFiber. 174 */ 175 void acquire(size_t amount = 1, Timeout timeout = Timeout.infinite) @safe @nogc { 176 ASSERT!"Semaphore tried to acquire %s, but total capacity is only %s"( amount<=capacity, amount, capacity ); 177 requestsPending += amount; 178 scope(exit) requestsPending -= amount; 179 180 bool slept; 181 if( requestsPending>amount ) { 182 // There are others waiting before us 183 suspendSecondary(timeout); 184 slept = true; 185 // XXX Should we set the priority back so that all sleeps are the same priority? 186 } 187 188 while( available<amount ) { 189 if( slept ) { 190 DEBUG!"Spurious wakeup waiting to acquire %s from semaphore. Available %s, capacity %s"( 191 amount, available, capacity); 192 } 193 194 slept = true; 195 suspendPrimary(timeout); 196 } 197 // In case we didn't sleep 198 theReactor.assertMayContextSwitch(); 199 200 DBG_ASSERT!"Semaphore has %s requests pending including us, but we're requesting %s"( 201 requestsPending >= amount, requestsPending, amount); 202 203 available -= amount; 204 // requestsPending -= amount; Will be done by scope(exit) above. We're not sleeping until the end of the function 205 206 if( requestsPending>amount && available>0 ) { 207 // If there are other pendings, we've slept. The same release that woke us up should have woken the next in 208 // line too, except it didn't know it released enough to wake more than one. 209 resumeOne(true); 210 } 211 } 212 213 /** 214 * Try to acquire resources from the semaphore. 215 * 216 * Try to acquire one or more "resources" from the semaphore. To maintain strict FIFO, the acquire will fail if 217 * another request is currently pending, even if there are enough resources to satisfy both requests. 218 * 219 * returns: 220 * Returns `true` if the request was granted. 221 */ 222 bool tryAcquire(size_t amount = 1) nothrow @safe @nogc { 223 if( requestsPending>0 ) 224 // There are other waiters. Won't satisfy immediately 225 return false; 226 227 if( available<amount ) { 228 return false; 229 } 230 231 available-=amount; 232 return true; 233 } 234 235 /** 236 * Release resources acquired via acquire. 237 */ 238 void release(size_t amount = 1) nothrow @safe @nogc { 239 ASSERT!"Semaphore.release called to release 0 coins"(amount>0); 240 241 available += amount; 242 ASSERT!"Semaphore.release(%s) called results in %s available coins but only %s capacity"( 243 available<=capacity, amount, available, capacity ); 244 245 if( requestsPending>0 ) 246 resumeOne(false); 247 } 248 249 private: 250 void resumeOne(bool immediate) nothrow @safe @nogc { 251 if( !resumePending ) { 252 if( primaryWaiter.isValid ) { 253 theReactor.resumeFiber(primaryWaiter, immediate); 254 } else { 255 waiters.resumeOne(immediate); 256 } 257 258 resumePending = true; 259 } 260 } 261 262 void suspendSecondary(Timeout timeout) @safe @nogc { 263 waiters.suspend(timeout); 264 ASSERT!"Semaphore woke up without anyone owning up to waking us up."(resumePending); 265 266 resumePending = false; 267 } 268 269 void suspendPrimary(Timeout timeout) @safe @nogc { 270 scope(failure) resumeOne(false); 271 272 DBG_ASSERT!"Cannot have two primary waiters"(!primaryWaiter.isValid); 273 primaryWaiter = theReactor.currentFiberHandle(); 274 scope(exit) { 275 resumePending = false; 276 primaryWaiter.reset(); 277 } 278 279 theReactor.suspendCurrentFiber(timeout); 280 ASSERT!"Semaphore woke up without anyone owning up to waking us up."(resumePending); 281 } 282 } 283 284 unittest { 285 import mecca.reactor; 286 287 theReactor.setup(); 288 scope(exit) theReactor.teardown(); 289 290 uint[4] counters; 291 Semaphore sem; 292 uint doneCount; 293 294 sem.open(3); 295 296 void func(uint cnt) { 297 sem.acquire(3); 298 theReactor.yield(); 299 sem.release(3); 300 301 foreach(i; 0..1000) { 302 sem.acquire(2); 303 counters[cnt]++; 304 sem.release(2); 305 } 306 307 theReactor.stop(); 308 } 309 310 theReactor.spawnFiber(&func, 0); 311 theReactor.spawnFiber(&func, 1); 312 theReactor.spawnFiber(&func, 2); 313 theReactor.spawnFiber(&func, 3); 314 315 theReactor.start(); 316 317 INFO!"Counters at end: %s"(counters); 318 foreach(i, cnt; counters) { 319 ASSERT!"Counter %s not correct: %s"(cnt>=999, i, counters); 320 } 321 } 322 323 unittest { 324 import mecca.reactor.sync.barrier; 325 326 uint counter; 327 auto sem = Semaphore(4); 328 Barrier barrier; 329 330 void fib(uint expected) { 331 assert(counter==0); 332 sem.acquire(4); 333 assertEQ(counter, expected, "Out of order acquire"); 334 counter++; 335 sem.release(4); 336 337 barrier.markDone(); 338 } 339 340 testWithReactor({ 341 sem.acquire(4); 342 343 foreach(uint i; 0..10) { 344 theReactor.spawnFiber(&fib, i); 345 barrier.addWaiter(); 346 theReactor.yield; 347 } 348 349 sem.release(1); 350 theReactor.yield(); 351 sem.release(3); 352 353 barrier.waitAll(); 354 355 assert(counter==10); 356 }); 357 } 358 359 unittest { 360 auto sem = Semaphore(2); 361 bool gotIt; 362 363 void fiberFunc() { 364 sem.acquire(); 365 scope(exit) sem.release(); 366 367 gotIt = true; 368 369 theReactor.yield(); 370 } 371 372 void mainFunc() { 373 assert(sem.tryAcquire(2), "tryAcquire failed on empty semaphore"); 374 375 auto fib = theReactor.spawnFiber(&fiberFunc); 376 theReactor.yield(); 377 assert(!gotIt, "semaphore acquired despite no available resources"); 378 379 sem.release(2); 380 assert(!gotIt, "release shouldn't yield"); 381 382 assert(!sem.tryAcquire(), "tryAcquire succeeded despite pending fibers"); 383 theReactor.yield(); 384 assert(gotIt, "Fiber didn't acquire despite release"); 385 assert(fib.isValid, "Fiber quite too soon"); 386 assertEQ(sem.level, 1, "Semaphore level is incorrect"); 387 assert(sem.tryAcquire(), "tryAcquire failed despite available resources"); 388 389 theReactor.joinFiber(fib); 390 assertEQ(sem.level, 1, "Semaphore level is incorrect"); 391 } 392 393 testWithReactor(&mainFunc); 394 } 395 396 unittest { 397 // WEKAPP-74912: exception injected during acquire 398 META!"Test exception injection during acquire"(); 399 import mecca.reactor; 400 401 Semaphore sem; 402 Timeout timeout; 403 404 sem.open(1); 405 406 void testerBody() { 407 sem.acquire(1, timeout); 408 sem.release(); 409 } 410 411 testWithReactor( { 412 timeout = Timeout(300.msecs); 413 sem.acquire(1, timeout); 414 415 auto fh1 = theReactor.spawnFiber(&testerBody); 416 theReactor.yield(); 417 418 auto fh2 = theReactor.spawnFiber(&testerBody); 419 theReactor.yield(); 420 421 auto fh3 = theReactor.spawnFiber(&testerBody); 422 theReactor.yield(); 423 424 sem.release(); 425 theReactor.throwInFiber!FiberKilled(fh1); 426 theReactor.joinFiber(fh1, timeout); 427 428 theReactor.joinFiber(fh2, timeout); 429 theReactor.joinFiber(fh3, timeout); 430 }); 431 } 432 433 version(unittest): 434 import mecca.reactor.sync.barrier; 435 import mecca.reactor.sync.event; 436 437 class SetCapacityTests { 438 Semaphore sem; 439 uint counter; 440 Barrier barrier; 441 442 void open(uint capacity, uint used = 0) { 443 assertEQ(barrier.numWaiters, 0, "Barrier not clear on open"); 444 sem.open(capacity, used); 445 counter = 0; 446 } 447 448 void reset() { 449 sem.close(); 450 } 451 452 void fib(uint howMuch, Event* clearToExit) { 453 scope(exit) barrier.markDone(); 454 455 sem.acquire(howMuch); 456 scope(exit) sem.release(howMuch); 457 458 counter++; 459 clearToExit.wait(); 460 } 461 462 FiberHandle spawn(uint howMuch, Event* clearToExit) { 463 auto fh = theReactor.spawnFiber(&fib, howMuch, clearToExit); 464 barrier.addWaiter(); 465 466 return fh; 467 } 468 469 @mecca_ut void releaseOnCapacityIncrease() { 470 open(1); 471 scope(success) reset(); 472 473 Event clearToExit; 474 475 spawn(1, &clearToExit); 476 spawn(1, &clearToExit); 477 478 theReactor.yield(); 479 theReactor.yield(); 480 assertEQ(counter, 1, "Acquire succeeded, should have failed"); 481 482 sem.setCapacity(2); 483 theReactor.yield(); 484 assertEQ(counter, 2, "Acquire failed, should have succeeded"); 485 486 clearToExit.set(); 487 barrier.waitAll(); 488 assertEQ(counter, 2); 489 } 490 491 @mecca_ut void reduceCapacity() { 492 open(4); 493 scope(success) reset(); 494 495 Event blocker; 496 497 spawn(2, &blocker); 498 spawn(2, &blocker); 499 spawn(2, &blocker); 500 501 theReactor.yield(); 502 theReactor.yield(); 503 assertEQ(counter, 2); 504 505 blocker.set(); 506 sem.setCapacity(2); 507 assertEQ(counter, 3); 508 509 blocker.reset(); 510 511 spawn(2, &blocker); 512 spawn(2, &blocker); 513 theReactor.yield(); 514 assertEQ(counter, 4); 515 516 blocker.set(); 517 barrier.waitAll(); 518 assertEQ(counter, 5); 519 } 520 } 521 522 mixin TEST_FIXTURE_REACTOR!SetCapacityTests;