1 /*** 2 * Implements a reactor aware mutex 3 * Authors: Shachar Shemesh 4 * Copyright: ©2017 Weka.io Ltd. 5 */ 6 module mecca.reactor.sync.lock; 7 8 // Licensed under the Boost license. Full copyright information in the AUTHORS file 9 10 import mecca.lib.reflection; 11 12 import mecca.log; 13 import mecca.lib.exception; 14 import mecca.lib.time; 15 import mecca.reactor; 16 import mecca.reactor.sync.barrier; 17 import mecca.reactor.sync.fiber_queue; 18 19 /** 20 A reactor aware non-recursive simple mutex. 21 22 This struct can be used for synchronizing different fibers. It cannot be used for synchronizing threads not running under the same 23 reactor. 24 */ 25 struct Lock { 26 private: 27 FiberQueue waiters; 28 FiberHandle _owner; 29 uint numRequesting; // Including the fiber already holding the lock 30 31 public: 32 /** Acquire the lock. Suspend the fiber if currently acquired. 33 * 34 * Upon return, the mutex is acquired. It is up to the caller to call release. 35 * 36 * The call is guaranteed not to sleep if the mutex is available. 37 * 38 * Throws: 39 * TimeoutExpired if the timeout expires 40 * 41 * Anything injected through a call to Reactor.throwInFiber 42 */ 43 void acquire(Timeout timeout = Timeout.infinite) @safe @nogc { 44 theReactor.assertMayContextSwitch("Lock acquire"); 45 DBG_ASSERT!"DEADLOCK: trying to acquire a lock by the same fiber already holding it"( 46 owner!=theReactor.currentFiberHandle); 47 48 scope(success) { 49 DBG_ASSERT!"Owner set when granting the lock"(!owner.isSet); 50 _owner = theReactor.currentFiberHandle; 51 } 52 53 if( numRequesting==0 ) { 54 // Fast path 55 numRequesting = 1; 56 theReactor.assertMayContextSwitch(); 57 return; 58 } 59 60 numRequesting++; 61 scope(failure) numRequesting--; 62 63 waiters.suspend(timeout); 64 } 65 66 /** Release a previously acquired lock. 67 * 68 * This funciton must be called from the same fiber that called the matching `acquire`. 69 */ 70 void release() nothrow @safe @nogc { 71 ASSERT!"Lock.release called on a non-acquired lock"(numRequesting>=1); 72 ASSERT!"Lock acquired by %s but released by %s"( 73 owner==theReactor.currentFiberHandle, owner, theReactor.currentFiberHandle); 74 numRequesting--; 75 _owner.reset(); 76 77 if( numRequesting>0 ) 78 waiters.resumeOne(); 79 } 80 81 /// Returns whether the lock is currently held. 82 @property bool isLocked() pure const nothrow @safe @nogc { 83 return numRequesting>0; 84 } 85 86 /// Returns the `FiberHandle` of the current owner of the lock. 87 /// 88 /// Function returns `FiberHandle.init` if not currently locked. This is $(I not) the same as calling `isLocked`. 89 /// There are some cases where `isLocked` will return `true` but `owner` will return the invalid handle. 90 @property FiberHandle owner() pure const nothrow @safe @nogc { 91 return _owner; 92 } 93 } 94 95 unittest { 96 import mecca.reactor; 97 import mecca.reactor.sync.barrier; 98 99 Lock lock; 100 Barrier barrier; 101 bool held; 102 103 void waiter() { 104 foreach(i; 0..3) { 105 lock.acquire(); 106 scope(exit) { 107 lock.release(); 108 held = false; 109 } 110 111 ASSERT!"Lock held on entry"(!held); 112 113 theReactor.sleep(1.msecs); 114 } 115 116 barrier.markDone(); 117 } 118 119 TscTimePoint begin = TscTimePoint.hardNow; 120 testWithReactor({ 121 foreach(i; 0..7) { 122 barrier.addWaiter(); 123 theReactor.spawnFiber(&waiter); 124 } 125 barrier.waitAll(); 126 ASSERT!"Lock held after end"(!held); 127 128 assert(!lock.isLocked); 129 }); 130 TscTimePoint end = TscTimePoint.hardNow; 131 132 assert(end - begin >= 21.msecs, "mutex did not mutually exclude"); 133 } 134 135 /** 136 Shared access lock 137 138 A standard Read-Write lock. Supports multiple readers or a single writer. 139 */ 140 struct SharedLock { 141 private: 142 Lock acquireLock; 143 Barrier sharedLockers; 144 145 public: 146 /// The state of the lock, as returned by `state`. 147 enum LockState { 148 Unlocked, /// Lock is unlocked 149 Shared, /// Lock is in shared mode 150 SharedWithExclusivePending, /// Lock is in shared mode, and there are fibers waiting to lock it exclusively 151 Exclusive, /// Lock is in exclusive mode. 152 } 153 154 /** 155 * Acquire an exclusive access lock 156 */ 157 @notrace void acquireExclusive(Timeout timeout = Timeout.infinite) @safe @nogc { 158 theReactor.assertMayContextSwitch("Lock acquire"); 159 160 acquireLock.acquire(timeout); 161 scope(failure) acquireLock.release(); 162 sharedLockers.waitAll(timeout); 163 } 164 165 /** 166 * Acquire a shared access lock 167 */ 168 @notrace void acquireShared(Timeout timeout = Timeout.infinite) @safe @nogc { 169 theReactor.assertMayContextSwitch("Lock acquire"); 170 171 acquireLock.acquire(timeout); 172 scope(exit) acquireLock.release(); 173 174 sharedLockers.addWaiter(); 175 } 176 177 /// Release a previously acquired exclusive access lock 178 @notrace void releaseExclusive() nothrow @safe @nogc { 179 DBG_ASSERT!"Have shared lockers during exclusive unlock"(!sharedLockers.hasWaiters); 180 acquireLock.release(); 181 } 182 183 /// Release a previously acquired shared access lock 184 @notrace void releaseShared() nothrow @safe @nogc { 185 DBG_ASSERT!"releaseShared call but no shared lockers"(sharedLockers.hasWaiters); 186 sharedLockers.markDone(); 187 } 188 189 /// Return the current state of the lock. 190 /// 191 /// Returns: 192 /// see `LockState` 193 @property LockState state() pure const nothrow @safe @nogc { 194 if( acquireLock.isLocked ) { 195 if( sharedLockers.hasWaiters ) 196 return LockState.SharedWithExclusivePending; 197 198 return LockState.Exclusive; 199 } 200 201 if( sharedLockers.hasWaiters ) 202 return LockState.Shared; 203 204 return LockState.Unlocked; 205 } 206 } 207 208 /** 209 Unfair shared access lock 210 211 This behaves like a standard read-write lock, except an exclusive lock is only obtained after all shared users have 212 relinquished the lock. 213 */ 214 struct UnfairSharedLock { 215 import mecca.reactor.sync.semaphore : Semaphore; 216 217 private: 218 Semaphore lock = Semaphore(1); 219 uint numSharedLockers; 220 221 public: 222 /// Lock state enum 223 alias LockState = SharedLock.LockState; 224 225 /** 226 * Acquire an exclusive access lock 227 */ 228 @notrace void acquireExclusive(Timeout timeout = Timeout.infinite) @safe @nogc { 229 theReactor.assertMayContextSwitch("Lock acquire"); 230 231 lock.acquire(1, timeout); 232 DBG_ASSERT!"Exclusivly locked but have shared lockers"(numSharedLockers==0); 233 } 234 235 /** 236 * Acquire a shared access lock 237 */ 238 @notrace void acquireShared(Timeout timeout = Timeout.infinite) @safe @nogc { 239 theReactor.assertMayContextSwitch("Lock acquire"); 240 241 if( numSharedLockers==0 ) { 242 lock.acquire(1, timeout); 243 } 244 245 DBG_ASSERT!"Shared lockers but lock is not acquired"(lock.level==0); 246 numSharedLockers++; 247 } 248 249 /// Release a previously acquired exclusive access lock 250 @notrace void releaseExclusive() nothrow @safe @nogc { 251 lock.release(); 252 } 253 254 /// Release a previously acquired shared access lock 255 @notrace void releaseShared() nothrow @safe @nogc { 256 DBG_ASSERT!"releaseShared call but no shared lockers"(numSharedLockers>0); 257 258 if( --numSharedLockers==0 ) { 259 lock.release(); 260 } 261 } 262 263 264 /// Return the current state of the lock. 265 /// 266 /// This function will never return the `SharedWithExclusivePending` state. 267 /// Returns: 268 /// see `LockState` 269 @property LockState state() pure const nothrow @safe @nogc { 270 if( numSharedLockers>0 ) 271 return LockState.Shared; 272 273 if( lock.level==0 ) 274 return LockState.Exclusive; 275 276 return LockState.Unlocked; 277 } 278 } 279 280 version(unittest) { 281 282 private mixin template Test(SharedLockType) { 283 uint generation; 284 SharedLockType lock; 285 Barrier allDone; 286 287 void exclusiveTest(uint gen) { 288 scope(exit) allDone.markDone(); 289 290 DEBUG!"Obtaining exclusive lock gen %s"(gen); 291 lock.acquireExclusive(); 292 assertEQ(lock.state, lock.LockState.Exclusive); 293 scope(exit) { 294 DEBUG!"Releasing exclusive lock gen %s"(gen); 295 lock.releaseExclusive(); 296 } 297 DEBUG!"Obtained exclusive lock gen %s"(gen); 298 299 assertEQ(generation , gen); 300 generation++; 301 foreach(i; 0..10) { 302 theReactor.yield(); 303 } 304 assertEQ(generation , gen+1); 305 } 306 307 void sharedTest(uint gen) { 308 scope(exit) allDone.markDone(); 309 310 DEBUG!"Obtaining shared lock gen %s"(gen); 311 lock.acquireShared(); 312 scope(exit) { 313 DEBUG!"Releasing shared lock gen %s"(gen); 314 lock.releaseShared(); 315 } 316 DEBUG!"Obtained shared lock gen %s"(gen); 317 318 assertEQ(generation , gen); 319 foreach(i; 0..10) { 320 theReactor.yield(); 321 } 322 assertEQ(generation , gen); 323 } 324 325 void startExclusive(uint gen) { 326 theReactor.spawnFiber( &exclusiveTest, gen ); 327 allDone.addWaiter(); 328 theReactor.yield(); 329 } 330 void startShared(uint gen) { 331 theReactor.spawnFiber( &sharedTest, gen ); 332 allDone.addWaiter(); 333 theReactor.yield(); 334 } 335 } 336 337 unittest { 338 mixin Test!SharedLock; 339 340 void testBody() { 341 assertEQ(lock.state, lock.LockState.Unlocked); 342 startShared(0); 343 assertEQ(lock.state, lock.LockState.Shared); 344 startShared(0); 345 assertEQ(lock.state, lock.LockState.Shared); 346 startExclusive(0); 347 assertEQ(lock.state, lock.LockState.SharedWithExclusivePending); 348 startShared(1); 349 startShared(1); 350 startShared(1); 351 startShared(1); 352 startExclusive(1); 353 startShared(2); 354 355 allDone.waitAll(); 356 assertEQ(lock.state, lock.LockState.Unlocked); 357 lock.acquireExclusive(Timeout(Duration.zero)); 358 assertEQ(lock.state, lock.LockState.Exclusive); 359 } 360 361 testWithReactor(&testBody); 362 } 363 364 unittest { 365 mixin Test!UnfairSharedLock; 366 367 void testBody() { 368 assertEQ(lock.state, lock.LockState.Unlocked); 369 startShared(0); 370 assertEQ(lock.state, lock.LockState.Shared); 371 startShared(0); 372 startExclusive(0); 373 assertEQ(lock.state, lock.LockState.Shared); 374 startShared(0); 375 startShared(0); 376 startShared(0); 377 startShared(0); 378 startExclusive(1); 379 startShared(0); 380 381 allDone.waitAll(); 382 assertEQ(lock.state, lock.LockState.Unlocked); 383 lock.acquireExclusive(Timeout(Duration.zero)); 384 assertEQ(lock.state, lock.LockState.Exclusive); 385 } 386 387 testWithReactor(&testBody); 388 } 389 } 390 391 /// A RAII wrapper for a lock 392 /// 393 /// Params: 394 /// LockType = the type of lock to define over 395 /// acquireName = the name of the function to call to acquire the lock 396 /// releaseName = the name of the function to call to release the lock 397 struct RAIILocker(LockType, string acquireName="acquire", string releaseName="release") { 398 private: 399 import std.format : format; 400 401 LockType* lock; 402 403 public: 404 @disable this(this); 405 // Constructor code is inside the acquire mixin 406 407 /// Auto unlocking destructor 408 ~this() nothrow @nogc { 409 if( lock !is null ) 410 release(); 411 } 412 413 //pragma(msg, acquireCode); 414 415 /// Acquire the lock 416 mixin(acquireCode); 417 418 //pragma(msg, releaseCode); 419 420 // Release the lock 421 mixin(releaseCode); 422 423 /// Report whether the container is currently locked 424 @notrace bool isLocked() const pure nothrow @safe @nogc { 425 return lock !is null; 426 } 427 private: 428 alias AcquireGenerator = CopySignature!( __traits(getMember, LockType, acquireName) ); 429 enum string acquireCode = q{ 430 // XXX I'm not sure why I bother documenting functions inside mixins that the doc will never build 431 /// Construct a locked instance 432 this(ref LockType lock, %1$s) @nogc { 433 acquire(lock, %2$s); 434 } 435 436 /// Acquire the lock 437 /// 438 /// Params: 439 /// lock = an instance of `LockType` to lock 440 /// 441 /// The other arguments are the same as for `LockType.acquire` 442 @notrace void acquire(ref LockType lock, %1$s) @nogc { 443 ASSERT!"Tried to acquire an already locked Locker"(this.lock is null); 444 this.lock = &lock; 445 lock.%3$s(%2$s); 446 } 447 }.format( AcquireGenerator.genDefinitionList, AcquireGenerator.genCallList, acquireName ); 448 449 alias ReleaseGenerator = CopySignature!( __traits(getMember, LockType, releaseName) ); 450 enum string releaseCode = q{ 451 /// Release the lock 452 @notrace void release(%1$s) @nogc { 453 ASSERT!"Tried to release a non locked Locker"(this.lock !is null); 454 scope(exit) this.lock = null; 455 lock.%3$s(%2$s); 456 } 457 }.format( ReleaseGenerator.genDefinitionList, ReleaseGenerator.genCallList, releaseName ); 458 } 459 460 unittest { 461 enum NUM_FIBERS = 5; 462 enum NUM_RUNS = 10; 463 464 uint numRuns; 465 bool locked; 466 Lock lock; 467 import mecca.reactor.sync.barrier: Barrier; 468 Barrier allDone; 469 470 void lockerFiber() { 471 scope(exit) allDone.markDone(); 472 473 foreach(i; 0..NUM_RUNS) { 474 auto locker = Locker(lock); 475 assert(!locked, "Mutual exclusion failed"); 476 locked = true; 477 numRuns++; 478 scope(exit) locked = false; 479 480 theReactor.yield(); 481 } 482 } 483 484 testWithReactor({ 485 foreach(i; 0..NUM_FIBERS) { 486 allDone.addWaiter(); 487 theReactor.spawnFiber(&lockerFiber); 488 } 489 490 allDone.waitAll(); 491 assert(!lock.isLocked, "lock acquired at end of test"); 492 assert(numRuns==NUM_FIBERS*NUM_RUNS); 493 }); 494 } 495 496 /// Locker wrapper for the standard lock 497 alias Locker = RAIILocker!(Lock); 498 /// Locker wrapper for a SharedLock with a shared lock 499 alias SharedLocker = RAIILocker!(SharedLock, "acquireShared", "releaseShared"); 500 /// Locker wrapper for a SharedLock with an exclusive lock 501 alias ExclusiveLocker = RAIILocker!(SharedLock, "acquireExclusive", "releaseExclusive");