1 /// Run a job in a fiber, but never two simultaneously 2 module mecca.reactor.lib.ondemand_worker; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import std.traits; 7 8 import mecca.lib.exception; 9 import mecca.lib.integers; 10 import mecca.lib.time: Timeout; 11 import mecca.log; 12 import mecca.reactor; 13 import mecca.reactor.fiber_group; 14 import mecca.reactor.sync.event: Signal; 15 16 /// A fiber spawning delegate must be of this type 17 alias SpawnFiberDlg = FiberHandle delegate(void delegate() dlg) nothrow @safe @nogc; 18 19 /// Exception thrown to abort a cancelled worker 20 class WorkerCancelled : FiberInterrupt { 21 mixin ExceptionBody!"OnDemandWorker task cancelled"; 22 } 23 24 /** 25 Run a job in a fiber, making sure never to run two simultaneously. 26 27 This semantics is useful for deferred jobs that need to collect or parse a state that changes. The number of jobs is 28 not dependent on the number of state changes. 29 30 The usage is to trigger the worker when the state changes. If no job is currently running, a fiber is immediately 31 launched to carry out the job. If the job is already running, it will trigger again once the current instance finishes. 32 */ 33 struct OnDemandWorkerFunc(alias F) { 34 private: 35 36 SpawnFiberDlg spawnFiber; 37 Signal done; 38 FiberHandle fiberHandle; 39 Serial32 requestGeneration, completedGeneration; 40 bool disabled, cancelAll; 41 ParameterTypeTuple!F args; 42 ParameterTypeTuple!F defaultArgs; 43 44 public: 45 @disable this(this); 46 47 /// Construct a worker 48 static if( ParameterTypeTuple!F.length>0 ) { 49 this(ParameterTypeTuple!F args, SpawnFiberDlg spawnFiberDlg = null) { 50 this.args = args; 51 this.defaultArgs = args; 52 this.spawnFiber = spawnFiberDlg; 53 } 54 } else { 55 // The above definition works for this case as well, but results in a constructor with default values to 56 // all arguments. 57 this(SpawnFiberDlg spawnFiberDlg) { 58 this.spawnFiber = spawnFiberDlg; 59 } 60 } 61 62 /// ditto 63 this(ParameterTypeTuple!F args, FiberGroup* group) { 64 ASSERT!"The fiber group must not be null"(group !is null); 65 this(args, &group.spawnFiber); 66 } 67 68 /// Helper for more verbose DEBUG logging 69 void DEBUG(string fmt, string file = __FILE_FULL_PATH__, string mod = __MODULE__, uint line = __LINE__, Args...)(Args args) { 70 .DEBUG!("#ONDEMAND(%s) worker: " ~ fmt, file, mod, line)(&this, args); 71 } 72 73 /** Trigger an execution of the worker. 74 75 If the worker is already executing, this will cause it to execute again once it completes. Otheriwse, opens a new 76 fiber and starts executing. 77 */ 78 @notrace void run() nothrow @safe @nogc { 79 if(disabled) { 80 WARN!"#ONDEMAND(%s) worker is #DISABLED, not running!"(&this); 81 return; 82 } 83 if (spawned) { 84 requestGeneration++; 85 cancelAll = false; 86 } else { 87 assert(requestGeneration==completedGeneration); 88 requestGeneration++; 89 90 if( spawnFiber is null ) { 91 // We can't do that in the constructor because we want to allow static initialization 92 spawnFiber = &theReactor.spawnFiber!(void delegate()); 93 } 94 95 fiberHandle = spawnFiber(&fib); 96 DEBUG!"spawned fiber %s"(fiberHandle.fiberId); 97 } 98 } 99 static if (ParameterTypeTuple!F.length > 0) { 100 /// ditto 101 void run(ParameterTypeTuple!F args) nothrow @safe @nogc { 102 this.args = args; 103 run(); 104 } 105 } 106 107 /** Wait for task queue to empty. 108 * 109 * This function waits until the worker fiber exits. This means that not only has the current 110 * 111 * Note: 112 * Unless independently throttling requests, there is no guarantee that this condition will $(B ever) happen. 113 */ 114 void waitIdle(Timeout timeout = Timeout.infinite) @safe @nogc { 115 while( spawned ) 116 done.wait(timeout); 117 } 118 119 /// Wait for all $(I currently) pending tasks to complete 120 void waitComplete(Timeout timeout = Timeout.infinite) @safe @nogc { 121 auto targetGeneration = requestGeneration; 122 while( targetGeneration>completedGeneration ) { 123 done.wait(timeout); 124 } 125 } 126 127 /// Reports whether a fiber is currently handling a request 128 @property public bool spawned() const nothrow @safe @nogc { 129 return fiberHandle.isValid; 130 } 131 132 /// Cancel currently running tasks. 133 /// 134 /// Params: 135 /// currentOnly = Cancel only the currently running task. False (default) means to cancel the current task and also 136 /// all currently scheduled tasks. Setting to true means cancel only the currently running task. If another one is 137 /// scheduled, it will get carried out. 138 void cancel(bool currentOnly = false) nothrow @safe @nogc { 139 if( !spawned ) { 140 ASSERT!"Task fiber not running but a cancel is pending"(!cancelAll); 141 return; 142 } 143 144 if( !currentOnly ) 145 cancelAll = true; 146 147 theReactor.throwInFiber!WorkerCancelled(fiberHandle); 148 } 149 150 /** Disable the worker from receiving new jobs. 151 * 152 * This is useful in preperation for shutdown of the system. Disabling the worker $(B does not) terminate a current 153 * job, if one is running. Use `cancel` to do that. 154 */ 155 void disable() nothrow @safe @nogc { 156 disabled = true; 157 158 // Don't interrupt a running task, but prevent future ones from starting 159 if( spawned ) 160 cancelAll = true; 161 162 // Mark all pending tasks as done 163 164 // This will get overridden by the fiber if a task is currently executing, but the cancelAll handling will do 165 // it again from within the fiber. 166 completedGeneration = requestGeneration; 167 } 168 169 /// Enable a disabled worker. 170 void enable() nothrow @safe @nogc { 171 if( !disabled ) 172 return; 173 174 DBG_ASSERT!"Disabled and idle worker has incomplete tasks"(completedGeneration==requestGeneration); 175 ASSERT!"Cannot enable a disabled worker while a defunct task is still running"(!spawned); 176 disabled = false; 177 cancelAll = false; 178 } 179 private: 180 void fib() @notrace { 181 scope (exit) { 182 fiberHandle.reset(); 183 cancelAll = false; 184 } 185 186 try { 187 theReactor.setFiberName(fiberHandle, __traits(identifier, F), &F); 188 189 bool firstTime = true; 190 do { 191 scope(exit) done.signal(); 192 scope(exit) args = defaultArgs; 193 194 if( firstTime ) { 195 firstTime = false; 196 } else { 197 theReactor.yield(); 198 } 199 200 auto targetGeneration = requestGeneration; 201 scope(exit) completedGeneration = targetGeneration; 202 203 F(args); 204 } while (!cancelAll && requestGeneration>completedGeneration); 205 } catch(Throwable t) { 206 DEBUG!"becoming #DISABLED due to exception: %s"(t.msg); 207 disable(); 208 throw t; 209 } 210 211 if( cancelAll ) 212 completedGeneration = requestGeneration; 213 } 214 } 215 216 /// Same as `OnDemandWorkerFunc`, except with a delegate. 217 struct OnDemandWorkerDelegate { 218 OnDemandWorkerFunc!wrapper onDemandWorkerFunc; 219 220 /// Construct a worker 221 this(void delegate() dg, SpawnFiberDlg spawnFiberDlg = null) @nogc @safe nothrow { 222 import mecca.lib.exception: DBG_ASSERT; 223 onDemandWorkerFunc = OnDemandWorkerFunc!wrapper(dg, spawnFiberDlg); 224 } 225 226 this(void delegate() dg, FiberGroup* group) @nogc @safe nothrow { 227 ASSERT!"The fiber group must not be null"(group !is null); 228 this(dg, &group.spawnFiber); 229 } 230 231 @notrace private static void wrapper(void delegate() dg) { 232 dg(); 233 } 234 235 @property bool isSet() { 236 return onDemandWorkerFunc.args[0] !is null; 237 } 238 239 alias onDemandWorkerFunc this; 240 // XXX for OndemandWorkerDelegate we don't want to allow calling run(void delegate()) and replace the delegate. 241 //@disable void run(void delegate() dg) {} 242 } 243 244 unittest { 245 import mecca.reactor.sync.event; 246 247 Event blocker; 248 uint counter; 249 250 void worker() { 251 blocker.wait(); 252 counter++; 253 } 254 255 auto odw = OnDemandWorkerDelegate(&worker); 256 257 void waiter(uint expected) { 258 odw.waitComplete(); 259 assertEQ(counter, expected, "Counter expected"); 260 } 261 262 void testBody() { 263 odw.run(); 264 odw.run(); 265 odw.run(); 266 267 theReactor.spawnFiber(&waiter, 1); 268 // as fast as you can 269 theReactor.yield(); 270 theReactor.yield(); 271 272 odw.run(); 273 odw.run(); 274 odw.run(); 275 276 assertEQ(counter, 0, "worker ran with event reset"); 277 blocker.set(); 278 odw.waitComplete(); 279 assertEQ(counter, 2, "counter-expected"); 280 } 281 282 testWithReactor(&testBody); 283 } 284 285 unittest { 286 import mecca.reactor.sync.event; 287 Event blocker; 288 uint counter; 289 290 void worker() { 291 blocker.wait(); 292 counter++; 293 } 294 295 auto odw = OnDemandWorkerDelegate(&worker); 296 297 void testBody() { 298 odw.run(); 299 theReactor.yield(); 300 theReactor.yield(); 301 odw.run(); 302 303 assertEQ(counter, 0, "Worker ran prematurely"); 304 305 blocker.set(); 306 odw.disable(); 307 odw.waitIdle(); 308 309 assertEQ(counter, 1, "Worker ran incorrect number of times"); 310 } 311 312 testWithReactor(&testBody); 313 }