1 /// Reactor aware queue that waits for operations to be possible 2 module mecca.reactor.sync.queue; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import mecca.containers.queue; 7 import mecca.lib.time; 8 import mecca.log; 9 import mecca.reactor.sync.semaphore : Semaphore; 10 11 /** 12 * Reactor aware fixed size queue. 13 * 14 * Unlike mecca.container.queue, this queue does not assert when an operation cannot be fulfilled. Instead, it blocks the calling fiber 15 * until the operation can be completed. 16 * 17 * Params: 18 * Type = the item type to be used. 19 * MaxItems = the queue's capacity. 20 */ 21 struct BlockingQueue(Type, ushort MaxItems) { 22 private: 23 alias QueueType = Queue!(Type, MaxItems); 24 25 QueueType queue; 26 Semaphore syncPush = Semaphore(MaxItems); 27 Semaphore syncPop = Semaphore(MaxItems, MaxItems); 28 29 public: 30 /** 31 * Returns true if no items are currenlty queued. 32 * 33 * Calling pop will block IFF empty is true. 34 */ 35 @property bool empty() const pure nothrow @safe @nogc { 36 return syncPop.level == 0; 37 } 38 /** 39 * Returns true if trying to add a new item would block. 40 * 41 * Calling push will block IFF full is true. 42 */ 43 @property bool full() const pure nothrow @safe @nogc { 44 return syncPush.level == 0; 45 } 46 47 /** 48 * Add an item to the queue. 49 * 50 * Params: 51 * item = item to add. 52 * timeout = how long to wait if no room to add immediately. 53 */ 54 @notrace void push(Type item, Timeout timeout = Timeout.infinite) @safe @nogc { 55 syncPush.acquire(1, timeout); 56 queue.push(item); 57 syncPop.release(); 58 } 59 /** 60 * Pushes an uninitialized item to the queue. 61 * 62 * For items that are faster to initialize in place than to copy, this form will be faster. 63 * 64 * Params: 65 * timeout = how long to wait if no room to add immediately. 66 * 67 * Returns: 68 * A pointer to the newly created item, so it can be filled with values. 69 */ 70 @notrace Type* push(Timeout timeout = Timeout.infinite) @safe @nogc { 71 syncPush.acquire(1, timeout); 72 auto ret = queue.push(); 73 syncPop.release(); 74 return ret; 75 } 76 77 /** 78 * Pop a single element from the queue 79 * 80 * Params: 81 * timeout = how long to wait if no items are immediately available. 82 */ 83 @notrace Type pop(Timeout timeout = Timeout.infinite) @safe @nogc { 84 syncPop.acquire(1, timeout); 85 auto ret = queue.pop(); 86 syncPush.release(); 87 return ret; 88 } 89 // @notrace ref const(Type) peek(Timeout timeout = Timeout.infinite) @safe @nogc { 90 /* 91 * Peek is not implemented, because there is almost no safe way of using it. The only safe way to use it is to hold a critical section 92 * while the reference is alive, as any sleep might invalidate the reference. Since many implementations still do pop at the end of 93 * processing, I (Shachar) have decided to leave out any implementation at all. 94 */ 95 96 /** 97 * Wait until all current fibers waiting to push have done so. 98 * 99 * This method follows the principle that ugly functionality should have an ugly name. This function is only reliably useful if you know 100 * there is only one fiber that can push to the queue. 101 * 102 * This method waits until all fibers currently waiting to push have done so, and then waits for an empty slot to clear up. If only one 103 * fiber is pushing, this guarantees that the next call to push will not have to sleep. 104 * 105 * Please note that if more than one fiber might be pushing items to the queue, no such guarantee exists even if a push is attempted 106 * immediately after this method returns. The reason is that fibers that asked to push after this fiber called the method are ahead of 107 * the future push event in line. 108 */ 109 void pushWaitersQueueWaitForHead(Timeout timeout = Timeout.infinite) @safe @nogc { 110 syncPush.acquire(1, timeout); 111 syncPush.release(); 112 } 113 } 114 115 version(unittest) { 116 import mecca.reactor; 117 import mecca.lib.exception; 118 119 class BlockingQueueTests { 120 enum SIZE = 10; 121 private BlockingQueue!(int, SIZE) queue; 122 123 private void fillQueue() { 124 assert(queue.empty); 125 foreach (i; 0 .. SIZE) { 126 assert(!queue.full); 127 queue.push(i, Timeout.elapsed); 128 } 129 assert(queue.full); 130 } 131 132 private void popTotal(size_t total) { 133 auto timeout = Timeout(10.seconds); 134 foreach (i; 0 .. total) { 135 queue.pop(timeout); 136 } 137 138 assertThrows!TimeoutExpired(queue.pop(Timeout(1.seconds))); 139 } 140 141 @mecca_ut void multipleWaitingToPush() { 142 fillQueue(); 143 144 ulong numRunning = 0; 145 void pushFib() { 146 numRunning++; 147 scope(success) numRunning--; 148 assert(queue.full); 149 queue.push(1000, Timeout(10.seconds)); 150 } 151 152 foreach (_; 0 .. SIZE) { 153 theReactor.spawnFiber(&pushFib); 154 } 155 while(numRunning < SIZE) { 156 theReactor.yield(); 157 } 158 159 popTotal(SIZE * 2); 160 assertEQ(numRunning, 0); 161 } 162 163 @mecca_ut void multipleWaitingNotFull() { 164 fillQueue(); 165 166 ulong numRunning = 0; 167 void waitThenPushFib() { 168 numRunning++; 169 scope(success) numRunning--; 170 assert(queue.full); 171 queue.push(2000, Timeout(10.seconds)); 172 } 173 174 foreach (_; 0 .. SIZE) { 175 theReactor.spawnFiber(&waitThenPushFib); 176 } 177 while(numRunning < SIZE) { 178 theReactor.yield(); 179 } 180 181 popTotal(SIZE * 2); 182 assertEQ(numRunning, 0); 183 } 184 185 @mecca_ut void waitPop() { 186 import mecca.reactor.sync.event : Event; 187 Event done; 188 theReactor.spawnFiber({ 189 foreach (i; 0 .. 2*SIZE) { 190 assertEQ(i, queue.pop(Timeout(500.msecs))); 191 } 192 done.set(); 193 }); 194 195 foreach (i; 0 .. 2*SIZE) { 196 theReactor.sleep(100.msecs); 197 assert(!queue.full); 198 queue.push(i); 199 } 200 done.wait(Timeout(1.seconds)); 201 } 202 } 203 204 mixin TEST_FIXTURE_REACTOR!BlockingQueueTests; 205 }