1 /// Run a job in a fiber, but never two simultaneously
2 module mecca.reactor.lib.ondemand_worker;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import std.traits;
7 
8 import mecca.lib.exception;
9 import mecca.lib.integers;
10 import mecca.lib.time: Timeout;
11 import mecca.log;
12 import mecca.reactor;
13 import mecca.reactor.fiber_group;
14 import mecca.reactor.sync.event: Signal;
15 
16 /// A fiber spawning delegate must be of this type
17 alias SpawnFiberDlg = FiberHandle delegate(void delegate() dlg) nothrow @safe @nogc;
18 
19 /// Exception thrown to abort a cancelled worker
20 class WorkerCancelled : FiberInterrupt {
21     mixin ExceptionBody!"OnDemandWorker task cancelled";
22 }
23 
24 /**
25 Run a job in a fiber, making sure never to run two simultaneously.
26 
27 This semantics is useful for deferred jobs that need to collect or parse a state that changes. The number of jobs is
28 not dependent on the number of state changes.
29 
30 The usage is to trigger the worker when the state changes. If no job is currently running, a fiber is immediately
31 launched to carry out the job. If the job is already running, it will trigger again once the current instance finishes.
32 */
33 struct OnDemandWorkerFunc(alias F) {
34 private:
35 
36     SpawnFiberDlg spawnFiber;
37     Signal done;
38     FiberHandle fiberHandle;
39     Serial32 requestGeneration, completedGeneration;
40     bool disabled, cancelAll;
41     ParameterTypeTuple!F args;
42     ParameterTypeTuple!F defaultArgs;
43 
44 public:
45     @disable this(this);
46 
47     /// Construct a worker
48     static if( ParameterTypeTuple!F.length>0 ) {
49         this(ParameterTypeTuple!F args, SpawnFiberDlg spawnFiberDlg = null) {
50             this.args = args;
51             this.defaultArgs = args;
52             this.spawnFiber = spawnFiberDlg;
53         }
54     } else {
55         // The above definition works for this case as well, but results in a constructor with default values to
56         // all arguments.
57         this(SpawnFiberDlg spawnFiberDlg) {
58             this.spawnFiber = spawnFiberDlg;
59         }
60     }
61 
62     /// ditto
63     this(ParameterTypeTuple!F args, FiberGroup* group) {
64         ASSERT!"The fiber group must not be null"(group !is null);
65         this(args, &group.spawnFiber);
66     }
67 
68     /// Helper for more verbose DEBUG logging
69     void DEBUG(string fmt, string file = __FILE_FULL_PATH__, string mod = __MODULE__, uint line = __LINE__, Args...)(Args args) {
70         .DEBUG!("#ONDEMAND(%s) worker: " ~ fmt, file, mod, line)(&this, args);
71     }
72 
73     /** Trigger an execution of the worker.
74 
75      If the worker is already executing, this will cause it to execute again once it completes. Otheriwse, opens a new
76      fiber and starts executing.
77      */
78     @notrace void run() nothrow @safe @nogc {
79         if(disabled) {
80             WARN!"#ONDEMAND(%s) worker is #DISABLED, not running!"(&this);
81             return;
82         }
83         if (spawned) {
84             requestGeneration++;
85             cancelAll = false;
86         } else {
87             assert(requestGeneration==completedGeneration);
88             requestGeneration++;
89 
90             if( spawnFiber is null ) {
91                 // We can't do that in the constructor because we want to allow static initialization
92                 spawnFiber = &theReactor.spawnFiber!(void delegate());
93             }
94 
95             fiberHandle = spawnFiber(&fib);
96             DEBUG!"spawned fiber %s"(fiberHandle.fiberId);
97         }
98     }
99     static if (ParameterTypeTuple!F.length > 0) {
100         /// ditto
101         void run(ParameterTypeTuple!F args) nothrow @safe @nogc {
102             this.args = args;
103             run();
104         }
105     }
106 
107     /** Wait for task queue to empty.
108      *
109      * This function waits until the worker fiber exits. This means that not only has the current 
110      *
111      * Note:
112      * Unless independently throttling requests, there is no guarantee that this condition will $(B ever) happen. 
113      */
114     void waitIdle(Timeout timeout = Timeout.infinite) @safe @nogc {
115         while( spawned )
116             done.wait(timeout);
117     }
118 
119     /// Wait for all $(I currently) pending tasks to complete
120     void waitComplete(Timeout timeout = Timeout.infinite) @safe @nogc {
121         auto targetGeneration = requestGeneration;
122         while( targetGeneration>completedGeneration ) {
123             done.wait(timeout);
124         }
125     }
126 
127     /// Reports whether a fiber is currently handling a request
128     @property public bool spawned() const nothrow @safe @nogc {
129         return fiberHandle.isValid;
130     }
131 
132     /// Cancel currently running tasks.
133     ///
134     /// Params:
135     /// currentOnly = Cancel only the currently running task. False (default) means to cancel the current task and also
136     /// all currently scheduled tasks. Setting to true means cancel only the currently running task. If another one is
137     /// scheduled, it will get carried out.
138     void cancel(bool currentOnly = false) nothrow @safe @nogc {
139         if( !spawned ) {
140             ASSERT!"Task fiber not running but a cancel is pending"(!cancelAll);
141             return;
142         }
143 
144         if( !currentOnly )
145             cancelAll = true;
146 
147         theReactor.throwInFiber!WorkerCancelled(fiberHandle);
148     }
149 
150     /** Disable the worker from receiving new jobs.
151      *
152      * This is useful in preperation for shutdown of the system. Disabling the worker $(B does not) terminate a current
153      * job, if one is running. Use `cancel` to do that.
154      */
155     void disable() nothrow @safe @nogc {
156         disabled = true;
157 
158         // Don't interrupt a running task, but prevent future ones from starting
159         if( spawned )
160             cancelAll = true;
161 
162         // Mark all pending tasks as done
163 
164         // This will get overridden by the fiber if a task is currently executing, but the cancelAll handling will do
165         // it again from within the fiber.
166         completedGeneration = requestGeneration;
167     }
168 
169     /// Enable a disabled worker.
170     void enable() nothrow @safe @nogc {
171         if( !disabled )
172             return;
173 
174         DBG_ASSERT!"Disabled and idle worker has incomplete tasks"(completedGeneration==requestGeneration);
175         ASSERT!"Cannot enable a disabled worker while a defunct task is still running"(!spawned);
176         disabled = false;
177         cancelAll = false;
178     }
179 private:
180     void fib() @notrace {
181         scope (exit) {
182             fiberHandle.reset();
183             cancelAll = false;
184         }
185 
186         try {
187             theReactor.setFiberName(fiberHandle, __traits(identifier, F), &F);
188 
189             bool firstTime = true;
190             do {
191                 scope(exit) done.signal();
192                 scope(exit) args = defaultArgs;
193 
194                 if( firstTime ) {
195                     firstTime = false;
196                 } else {
197                     theReactor.yield();
198                 }
199 
200                 auto targetGeneration = requestGeneration;
201                 scope(exit) completedGeneration = targetGeneration;
202 
203                 F(args);
204             } while (!cancelAll && requestGeneration>completedGeneration);
205         } catch(Throwable t) {
206             DEBUG!"becoming #DISABLED due to exception: %s"(t.msg);
207             disable();
208             throw t;
209         }
210 
211         if( cancelAll )
212             completedGeneration = requestGeneration;
213     }
214 }
215 
216 /// Same as `OnDemandWorkerFunc`, except with a delegate.
217 struct OnDemandWorkerDelegate {
218     OnDemandWorkerFunc!wrapper onDemandWorkerFunc;
219 
220     /// Construct a worker
221     this(void delegate() dg, SpawnFiberDlg spawnFiberDlg = null) @nogc @safe nothrow {
222         import mecca.lib.exception: DBG_ASSERT;
223         onDemandWorkerFunc = OnDemandWorkerFunc!wrapper(dg, spawnFiberDlg);
224     }
225 
226     this(void delegate() dg, FiberGroup* group) @nogc @safe nothrow {
227         ASSERT!"The fiber group must not be null"(group !is null);
228         this(dg, &group.spawnFiber);
229     }
230 
231     @notrace private static void wrapper(void delegate() dg) {
232         dg();
233     }
234 
235     @property bool isSet() {
236         return onDemandWorkerFunc.args[0] !is null;
237     }
238 
239     alias onDemandWorkerFunc this;
240     // XXX for OndemandWorkerDelegate we don't want to allow calling run(void delegate()) and replace the delegate.
241     //@disable void run(void delegate() dg) {}
242 }
243 
244 unittest {
245     import mecca.reactor.sync.event;
246 
247     Event blocker;
248     uint counter;
249 
250     void worker() {
251         blocker.wait();
252         counter++;
253     }
254 
255     auto odw = OnDemandWorkerDelegate(&worker);
256 
257     void waiter(uint expected) {
258         odw.waitComplete();
259         assertEQ(counter, expected, "Counter expected");
260     }
261 
262     void testBody() {
263         odw.run();
264         odw.run();
265         odw.run();
266 
267         theReactor.spawnFiber(&waiter, 1);
268         // as fast as you can
269         theReactor.yield();
270         theReactor.yield();
271 
272         odw.run();
273         odw.run();
274         odw.run();
275 
276         assertEQ(counter, 0, "worker ran with event reset");
277         blocker.set();
278         odw.waitComplete();
279         assertEQ(counter, 2, "counter-expected");
280     }
281 
282     testWithReactor(&testBody);
283 }
284 
285 unittest {
286     import mecca.reactor.sync.event;
287     Event blocker;
288     uint counter;
289 
290     void worker() {
291         blocker.wait();
292         counter++;
293     }
294 
295     auto odw = OnDemandWorkerDelegate(&worker);
296 
297     void testBody() {
298         odw.run();
299         theReactor.yield();
300         theReactor.yield();
301         odw.run();
302 
303         assertEQ(counter, 0, "Worker ran prematurely");
304 
305         blocker.set();
306         odw.disable();
307         odw.waitIdle();
308 
309         assertEQ(counter, 1, "Worker ran incorrect number of times");
310     }
311 
312     testWithReactor(&testBody);
313 }