1 /***
2  * Implements a reactor aware mutex
3  * Authors: Shachar Shemesh
4  * Copyright: ©2017 Weka.io Ltd.
5  */
6 module mecca.reactor.sync.lock;
7 
8 // Licensed under the Boost license. Full copyright information in the AUTHORS file
9 
10 import mecca.lib.reflection;
11 
12 import mecca.log;
13 import mecca.lib.exception;
14 import mecca.lib.time;
15 import mecca.reactor;
16 import mecca.reactor.sync.barrier;
17 import mecca.reactor.sync.fiber_queue;
18 
19 /**
20   A reactor aware non-recursive simple mutex.
21 
22   This struct can be used for synchronizing different fibers. It cannot be used for synchronizing threads not running under the same
23   reactor.
24  */
25 struct Lock {
26 private:
27     FiberQueue waiters;
28     FiberHandle _owner;
29     uint numRequesting; // Including the fiber already holding the lock
30 
31 public:
32     /** Acquire the lock. Suspend the fiber if currently acquired.
33      * 
34      * Upon return, the mutex is acquired. It is up to the caller to call release.
35      *
36      * The call is guaranteed not to sleep if the mutex is available.
37      *
38      * Throws:
39      * TimeoutExpired if the timeout expires
40      *
41      * Anything injected through a call to Reactor.throwInFiber
42      */
43     void acquire(Timeout timeout = Timeout.infinite) @safe @nogc {
44         theReactor.assertMayContextSwitch("Lock acquire");
45         DBG_ASSERT!"DEADLOCK: trying to acquire a lock by the same fiber already holding it"(
46                 owner!=theReactor.currentFiberHandle);
47 
48         scope(success) {
49             DBG_ASSERT!"Owner set when granting the lock"(!owner.isSet);
50             _owner = theReactor.currentFiberHandle;
51         }
52 
53         if( numRequesting==0 ) {
54             // Fast path
55             numRequesting = 1;
56             theReactor.assertMayContextSwitch();
57             return;
58         }
59 
60         numRequesting++;
61         scope(failure) numRequesting--;
62 
63         waiters.suspend(timeout);
64     }
65 
66     /** Release a previously acquired lock.
67      *
68      * This funciton must be called from the same fiber that called the matching `acquire`.
69      */
70     void release() nothrow @safe @nogc {
71         ASSERT!"Lock.release called on a non-acquired lock"(numRequesting>=1);
72         ASSERT!"Lock acquired by %s but released by %s"(
73                 owner==theReactor.currentFiberHandle, owner, theReactor.currentFiberHandle);
74         numRequesting--;
75         _owner.reset();
76 
77         if( numRequesting>0 )
78             waiters.resumeOne();
79     }
80 
81     /// Returns whether the lock is currently held.
82     @property bool isLocked() pure const nothrow @safe @nogc {
83         return numRequesting>0;
84     }
85 
86     /// Returns the `FiberHandle` of the current owner of the lock.
87     ///
88     /// Function returns `FiberHandle.init` if not currently locked. This is $(I not) the same as calling `isLocked`.
89     /// There are some cases where `isLocked` will return `true` but `owner` will return the invalid handle.
90     @property FiberHandle owner() pure const nothrow @safe @nogc {
91         return _owner;
92     }
93 }
94 
95 unittest {
96     import mecca.reactor;
97     import mecca.reactor.sync.barrier;
98 
99     Lock lock;
100     Barrier barrier;
101     bool held;
102 
103     void waiter() {
104         foreach(i; 0..3) {
105             lock.acquire();
106             scope(exit) {
107                 lock.release();
108                 held = false;
109             }
110 
111             ASSERT!"Lock held on entry"(!held);
112 
113             theReactor.sleep(1.msecs);
114         }
115 
116         barrier.markDone();
117     }
118 
119     TscTimePoint begin = TscTimePoint.hardNow;
120     testWithReactor({
121             foreach(i; 0..7) {
122                 barrier.addWaiter();
123                 theReactor.spawnFiber(&waiter);
124             }
125             barrier.waitAll();
126             ASSERT!"Lock held after end"(!held);
127 
128             assert(!lock.isLocked);
129             });
130     TscTimePoint end = TscTimePoint.hardNow;
131 
132     assert(end - begin >= 21.msecs, "mutex did not mutually exclude");
133 }
134 
135 /**
136   Shared access lock
137 
138   A standard Read-Write lock. Supports multiple readers or a single writer.
139  */
140 struct SharedLock {
141 private:
142     Lock        acquireLock;
143     Barrier     sharedLockers;
144 
145 public:
146     /// The state of the lock, as returned by `state`.
147     enum LockState {
148         Unlocked,                       /// Lock is unlocked
149         Shared,                         /// Lock is in shared mode
150         SharedWithExclusivePending,     /// Lock is in shared mode, and there are fibers waiting to lock it exclusively
151         Exclusive,                      /// Lock is in exclusive mode.
152     }
153 
154     /**
155      * Acquire an exclusive access lock
156      */
157     @notrace void acquireExclusive(Timeout timeout = Timeout.infinite) @safe @nogc {
158         theReactor.assertMayContextSwitch("Lock acquire");
159 
160         acquireLock.acquire(timeout);
161         scope(failure) acquireLock.release();
162         sharedLockers.waitAll(timeout);
163     }
164 
165     /**
166      * Acquire a shared access lock
167      */
168     @notrace void acquireShared(Timeout timeout = Timeout.infinite) @safe @nogc {
169         theReactor.assertMayContextSwitch("Lock acquire");
170 
171         acquireLock.acquire(timeout);
172         scope(exit) acquireLock.release();
173 
174         sharedLockers.addWaiter();
175     }
176 
177     /// Release a previously acquired exclusive access lock
178     @notrace void releaseExclusive() nothrow @safe @nogc {
179         DBG_ASSERT!"Have shared lockers during exclusive unlock"(!sharedLockers.hasWaiters);
180         acquireLock.release();
181     }
182 
183     /// Release a previously acquired shared access lock
184     @notrace void releaseShared() nothrow @safe @nogc {
185         DBG_ASSERT!"releaseShared call but no shared lockers"(sharedLockers.hasWaiters);
186         sharedLockers.markDone();
187     }
188 
189     /// Return the current state of the lock.
190     ///
191     /// Returns:
192     /// see `LockState`
193     @property LockState state() pure const nothrow @safe @nogc {
194         if( acquireLock.isLocked ) {
195             if( sharedLockers.hasWaiters )
196                 return LockState.SharedWithExclusivePending;
197 
198             return LockState.Exclusive;
199         }
200 
201         if( sharedLockers.hasWaiters )
202             return LockState.Shared;
203 
204         return LockState.Unlocked;
205     }
206 }
207 
208 /**
209   Unfair shared access lock
210 
211   This behaves like a standard read-write lock, except an exclusive lock is only obtained after all shared users have
212   relinquished the lock.
213  */
214 struct UnfairSharedLock {
215     import mecca.reactor.sync.semaphore : Semaphore;
216 
217 private:
218     Semaphore   lock = Semaphore(1);
219     uint        numSharedLockers;
220 
221 public:
222     /// Lock state enum
223     alias LockState = SharedLock.LockState;
224 
225     /**
226      * Acquire an exclusive access lock
227      */
228     @notrace void acquireExclusive(Timeout timeout = Timeout.infinite) @safe @nogc {
229         theReactor.assertMayContextSwitch("Lock acquire");
230 
231         lock.acquire(1, timeout);
232         DBG_ASSERT!"Exclusivly locked but have shared lockers"(numSharedLockers==0);
233     }
234 
235     /**
236      * Acquire a shared access lock
237      */
238     @notrace void acquireShared(Timeout timeout = Timeout.infinite) @safe @nogc {
239         theReactor.assertMayContextSwitch("Lock acquire");
240 
241         if( numSharedLockers==0 ) {
242             lock.acquire(1, timeout);
243         }
244 
245         DBG_ASSERT!"Shared lockers but lock is not acquired"(lock.level==0);
246         numSharedLockers++;
247     }
248 
249     /// Release a previously acquired exclusive access lock
250     @notrace void releaseExclusive() nothrow @safe @nogc {
251         lock.release();
252     }
253 
254     /// Release a previously acquired shared access lock
255     @notrace void releaseShared() nothrow @safe @nogc {
256         DBG_ASSERT!"releaseShared call but no shared lockers"(numSharedLockers>0);
257 
258         if( --numSharedLockers==0 ) {
259             lock.release();
260         }
261     }
262 
263 
264     /// Return the current state of the lock.
265     ///
266     /// This function will never return the `SharedWithExclusivePending` state.
267     /// Returns:
268     /// see `LockState`
269     @property LockState state() pure const nothrow @safe @nogc {
270         if( numSharedLockers>0 )
271             return LockState.Shared;
272 
273         if( lock.level==0 )
274             return LockState.Exclusive;
275 
276         return LockState.Unlocked;
277     }
278 }
279 
280 version(unittest) {
281 
282 private mixin template Test(SharedLockType) {
283     uint generation;
284     SharedLockType lock;
285     Barrier allDone;
286 
287     void exclusiveTest(uint gen) {
288         scope(exit) allDone.markDone();
289 
290         DEBUG!"Obtaining exclusive lock gen %s"(gen);
291         lock.acquireExclusive();
292         assertEQ(lock.state, lock.LockState.Exclusive);
293         scope(exit) {
294             DEBUG!"Releasing exclusive lock gen %s"(gen);
295             lock.releaseExclusive();
296         }
297         DEBUG!"Obtained exclusive lock gen %s"(gen);
298 
299         assertEQ(generation ,  gen);
300         generation++;
301         foreach(i; 0..10) {
302             theReactor.yield();
303         }
304         assertEQ(generation ,  gen+1);
305     }
306 
307     void sharedTest(uint gen) {
308         scope(exit) allDone.markDone();
309 
310         DEBUG!"Obtaining shared lock gen %s"(gen);
311         lock.acquireShared();
312         scope(exit) {
313             DEBUG!"Releasing shared lock gen %s"(gen);
314             lock.releaseShared();
315         }
316         DEBUG!"Obtained shared lock gen %s"(gen);
317 
318         assertEQ(generation ,  gen);
319         foreach(i; 0..10) {
320             theReactor.yield();
321         }
322         assertEQ(generation ,  gen);
323     }
324 
325     void startExclusive(uint gen) {
326         theReactor.spawnFiber( &exclusiveTest, gen );
327         allDone.addWaiter();
328         theReactor.yield();
329     }
330     void startShared(uint gen) {
331         theReactor.spawnFiber( &sharedTest, gen );
332         allDone.addWaiter();
333         theReactor.yield();
334     }
335 }
336 
337 unittest {
338     mixin Test!SharedLock;
339 
340     void testBody() {
341         assertEQ(lock.state, lock.LockState.Unlocked);
342         startShared(0);
343         assertEQ(lock.state, lock.LockState.Shared);
344         startShared(0);
345         assertEQ(lock.state, lock.LockState.Shared);
346         startExclusive(0);
347         assertEQ(lock.state, lock.LockState.SharedWithExclusivePending);
348         startShared(1);
349         startShared(1);
350         startShared(1);
351         startShared(1);
352         startExclusive(1);
353         startShared(2);
354 
355         allDone.waitAll();
356         assertEQ(lock.state, lock.LockState.Unlocked);
357         lock.acquireExclusive(Timeout(Duration.zero));
358         assertEQ(lock.state, lock.LockState.Exclusive);
359     }
360 
361     testWithReactor(&testBody);
362 }
363 
364 unittest {
365     mixin Test!UnfairSharedLock;
366 
367     void testBody() {
368         assertEQ(lock.state, lock.LockState.Unlocked);
369         startShared(0);
370         assertEQ(lock.state, lock.LockState.Shared);
371         startShared(0);
372         startExclusive(0);
373         assertEQ(lock.state, lock.LockState.Shared);
374         startShared(0);
375         startShared(0);
376         startShared(0);
377         startShared(0);
378         startExclusive(1);
379         startShared(0);
380 
381         allDone.waitAll();
382         assertEQ(lock.state, lock.LockState.Unlocked);
383         lock.acquireExclusive(Timeout(Duration.zero));
384         assertEQ(lock.state, lock.LockState.Exclusive);
385     }
386 
387     testWithReactor(&testBody);
388 }
389 }
390 
391 /// A RAII wrapper for a lock
392 ///
393 /// Params:
394 /// LockType = the type of lock to define over
395 /// acquireName = the name of the function to call to acquire the lock
396 /// releaseName = the name of the function to call to release the lock
397 struct RAIILocker(LockType, string acquireName="acquire", string releaseName="release") {
398 private:
399     import std.format : format;
400 
401     LockType* lock;
402 
403 public:
404     @disable this(this);
405     // Constructor code is inside the acquire mixin
406 
407     /// Auto unlocking destructor
408     ~this() nothrow @nogc {
409         if( lock !is null )
410             release();
411     }
412 
413     //pragma(msg, acquireCode);
414 
415     /// Acquire the lock
416     mixin(acquireCode);
417 
418     //pragma(msg, releaseCode);
419 
420     // Release the lock
421     mixin(releaseCode);
422 
423     /// Report whether the container is currently locked
424     @notrace bool isLocked() const pure nothrow @safe @nogc {
425         return lock !is null;
426     }
427 private:
428     alias AcquireGenerator = CopySignature!( __traits(getMember, LockType, acquireName) );
429     enum string acquireCode = q{
430         // XXX I'm not sure why I bother documenting functions inside mixins that the doc will never build
431         /// Construct a locked instance
432         this(ref LockType lock, %1$s) @nogc {
433             acquire(lock, %2$s);
434         }
435 
436         /// Acquire the lock
437         ///
438         /// Params:
439         /// lock = an instance of `LockType` to lock
440         ///
441         /// The other arguments are the same as for `LockType.acquire`
442         @notrace void acquire(ref LockType lock, %1$s) @nogc {
443             ASSERT!"Tried to acquire an already locked Locker"(this.lock is null);
444             this.lock = &lock;
445             lock.%3$s(%2$s);
446         }
447     }.format( AcquireGenerator.genDefinitionList, AcquireGenerator.genCallList, acquireName );
448 
449     alias ReleaseGenerator = CopySignature!( __traits(getMember, LockType, releaseName) );
450     enum string releaseCode = q{
451         /// Release the lock
452         @notrace void release(%1$s) @nogc {
453             ASSERT!"Tried to release a non locked Locker"(this.lock !is null);
454             scope(exit) this.lock = null;
455             lock.%3$s(%2$s);
456         }
457     }.format( ReleaseGenerator.genDefinitionList, ReleaseGenerator.genCallList, releaseName );
458 }
459 
460 unittest {
461     enum NUM_FIBERS = 5;
462     enum NUM_RUNS = 10;
463 
464     uint numRuns;
465     bool locked;
466     Lock lock;
467     import mecca.reactor.sync.barrier: Barrier;
468     Barrier allDone;
469 
470     void lockerFiber() {
471         scope(exit) allDone.markDone();
472 
473         foreach(i; 0..NUM_RUNS) {
474             auto locker = Locker(lock);
475             assert(!locked, "Mutual exclusion failed");
476             locked = true;
477             numRuns++;
478             scope(exit) locked = false;
479 
480             theReactor.yield();
481         }
482     }
483 
484     testWithReactor({
485         foreach(i; 0..NUM_FIBERS) {
486             allDone.addWaiter();
487             theReactor.spawnFiber(&lockerFiber);
488         }
489 
490         allDone.waitAll();
491         assert(!lock.isLocked, "lock acquired at end of test");
492         assert(numRuns==NUM_FIBERS*NUM_RUNS);
493     });
494 }
495 
496 /// Locker wrapper for the standard lock
497 alias Locker = RAIILocker!(Lock);
498 /// Locker wrapper for a SharedLock with a shared lock
499 alias SharedLocker = RAIILocker!(SharedLock, "acquireShared", "releaseShared");
500 /// Locker wrapper for a SharedLock with an exclusive lock
501 alias ExclusiveLocker = RAIILocker!(SharedLock, "acquireExclusive", "releaseExclusive");