1 /// Reactor aware queue that waits for operations to be possible
2 module mecca.reactor.sync.queue;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import mecca.containers.queue;
7 import mecca.lib.time;
8 import mecca.log;
9 import mecca.reactor.sync.semaphore : Semaphore;
10 
11 /**
12  * Reactor aware fixed size queue.
13  *
14  * Unlike mecca.container.queue, this queue does not assert when an operation cannot be fulfilled. Instead, it blocks the calling fiber
15  * until the operation can be completed.
16  *
17  * Params:
18  *  Type = the item type to be used.
19  *  MaxItems = the queue's capacity.
20  */
21 struct BlockingQueue(Type, ushort MaxItems) {
22 private:
23     alias QueueType = Queue!(Type, MaxItems);
24 
25     QueueType queue;
26     Semaphore syncPush = Semaphore(MaxItems);
27     Semaphore syncPop = Semaphore(MaxItems, MaxItems);
28 
29 public:
30     /**
31      * Returns true if no items are currenlty queued.
32      *
33      * Calling pop will block IFF empty is true.
34      */
35     @property bool empty() const pure nothrow @safe @nogc {
36         return syncPop.level == 0;
37     }
38     /**
39      * Returns true if trying to add a new item would block.
40      *
41      * Calling push will block IFF full is true.
42      */
43     @property bool full() const pure nothrow @safe @nogc {
44         return syncPush.level == 0;
45     }
46 
47     /**
48      * Add an item to the queue.
49      *
50      * Params:
51      *  item = item to add.
52      *  timeout = how long to wait if no room to add immediately.
53      */
54     @notrace void push(Type item, Timeout timeout = Timeout.infinite) @safe @nogc {
55         syncPush.acquire(1, timeout);
56         queue.push(item);
57         syncPop.release();
58     }
59     /**
60      * Pushes an uninitialized item to the queue.
61      *
62      * For items that are faster to initialize in place than to copy, this form will be faster.
63      *
64      * Params:
65      *  timeout = how long to wait if no room to add immediately.
66      *
67      * Returns:
68      *  A pointer to the newly created item, so it can be filled with values.
69      */
70     @notrace Type* push(Timeout timeout = Timeout.infinite) @safe @nogc {
71         syncPush.acquire(1, timeout);
72         auto ret = queue.push();
73         syncPop.release();
74         return ret;
75     }
76 
77     /**
78      * Pop a single element from the queue
79      *
80      * Params:
81      *  timeout = how long to wait if no items are immediately available.
82      */
83     @notrace Type pop(Timeout timeout = Timeout.infinite) @safe @nogc {
84         syncPop.acquire(1, timeout);
85         auto ret = queue.pop();
86         syncPush.release();
87         return ret;
88     }
89     // @notrace ref const(Type) peek(Timeout timeout = Timeout.infinite) @safe @nogc {
90     /*
91      * Peek is not implemented, because there is almost no safe way of using it. The only safe way to use it is to hold a critical section
92      * while the reference is alive, as any sleep might invalidate the reference. Since many implementations still do pop at the end of
93      * processing, I (Shachar) have decided to leave out any implementation at all.
94      */
95 
96     /**
97      * Wait until all current fibers waiting to push have done so.
98      *
99      * This method follows the principle that ugly functionality should have an ugly name. This function is only reliably useful if you know
100      * there is only one fiber that can push to the queue.
101      *
102      * This method waits until all fibers currently waiting to push have done so, and then waits for an empty slot to clear up. If only one
103      * fiber is pushing, this guarantees that the next call to push will not have to sleep.
104      *
105      * Please note that if more than one fiber might be pushing items to the queue, no such guarantee exists even if a push is attempted
106      * immediately after this method returns. The reason is that fibers that asked to push after this fiber called the method are ahead of
107      * the future push event in line.
108      */
109     void pushWaitersQueueWaitForHead(Timeout timeout = Timeout.infinite) @safe @nogc {
110         syncPush.acquire(1, timeout);
111         syncPush.release();
112     }
113 }
114 
115 version(unittest) {
116     import mecca.reactor;
117     import mecca.lib.exception;
118 
119     class BlockingQueueTests {
120         enum SIZE = 10;
121         private BlockingQueue!(int, SIZE) queue;
122 
123         private void fillQueue() {
124             assert(queue.empty);
125             foreach (i; 0 .. SIZE) {
126                 assert(!queue.full);
127                 queue.push(i, Timeout.elapsed);
128             }
129             assert(queue.full);
130         }
131 
132         private void popTotal(size_t total) {
133             auto timeout = Timeout(10.seconds);
134             foreach (i; 0 .. total) {
135                 queue.pop(timeout);
136             }
137 
138             assertThrows!TimeoutExpired(queue.pop(Timeout(1.seconds)));
139         }
140 
141         @mecca_ut void multipleWaitingToPush() {
142             fillQueue();
143 
144             ulong numRunning = 0;
145             void pushFib() {
146                 numRunning++;
147                 scope(success) numRunning--;
148                 assert(queue.full);
149                 queue.push(1000, Timeout(10.seconds));
150             }
151 
152             foreach (_; 0 .. SIZE) {
153                 theReactor.spawnFiber(&pushFib);
154             }
155             while(numRunning < SIZE) {
156                 theReactor.yield();
157             }
158 
159             popTotal(SIZE * 2);
160             assertEQ(numRunning, 0);
161         }
162 
163         @mecca_ut void multipleWaitingNotFull() {
164             fillQueue();
165 
166             ulong numRunning = 0;
167             void waitThenPushFib() {
168                 numRunning++;
169                 scope(success) numRunning--;
170                 assert(queue.full);
171                 queue.push(2000, Timeout(10.seconds));
172             }
173 
174             foreach (_; 0 .. SIZE) {
175                 theReactor.spawnFiber(&waitThenPushFib);
176             }
177             while(numRunning < SIZE) {
178                 theReactor.yield();
179             }
180 
181             popTotal(SIZE * 2);
182             assertEQ(numRunning, 0);
183         }
184 
185         @mecca_ut void waitPop() {
186             import mecca.reactor.sync.event : Event;
187             Event done;
188             theReactor.spawnFiber({
189                 foreach (i; 0 .. 2*SIZE) {
190                     assertEQ(i, queue.pop(Timeout(500.msecs)));
191                 }
192                 done.set();
193             });
194 
195             foreach (i; 0 .. 2*SIZE) {
196                 theReactor.sleep(100.msecs);
197                 assert(!queue.full);
198                 queue.push(i);
199             }
200             done.wait(Timeout(1.seconds));
201         }
202     }
203 
204     mixin TEST_FIXTURE_REACTOR!BlockingQueueTests;
205 }