1 module mecca.reactor.subsystems.threading;
2 
3 // Licensed under the Boost license. Full copyright information in the AUTHORS file
4 
5 import core.atomic;
6 import core.thread;
7 import core.sys.posix.signal;
8 import std.exception;
9 
10 import mecca.platform.os: currentThreadId, ThreadId;
11 import mecca.lib.reflection;
12 import mecca.lib.exception;
13 import mecca.lib.time;
14 import mecca.lib.typedid: TypedIdentifier;
15 
16 import mecca.containers.otm_queue: DuplexQueue;
17 import mecca.containers.arrays: FixedString;
18 import mecca.containers.pools: FixedPool;
19 
20 import mecca.log;
21 import mecca.reactor: theReactor, FiberHandle, TimerHandle;
22 
23 
24 class WorkerThread: Thread {
25     public import mecca.platform.os : BLOCKED_SIGNALS;
26 
27     __gshared static void delegate(WorkerThread) preThreadFunc;
28 
29     align(8) ThreadId kernel_tid = -1;
30     void delegate() dg;
31 
32     this(void delegate() dg, size_t stackSize = 0) {
33         kernel_tid = -1;
34         this.dg = dg;
35         this.isDaemon = true;
36         super(&wrapper, stackSize);
37     }
38 
39     private void wrapper() nothrow {
40         scope(exit) kernel_tid = -1;
41         kernel_tid = currentThreadId();
42 
43         sigset_t sigset = void;
44         ASSERT!"sigemptyset failed"(sigemptyset(&sigset) == 0);
45         foreach(sig; BLOCKED_SIGNALS) {
46             ASSERT!"sigaddset(%s) failed"(sigaddset(&sigset, sig) == 0, sig);
47         }
48         static if (is(typeof(SIGRTMIN)) && is(typeof(SIGRTMAX)))
49         {
50             foreach(sig; SIGRTMIN .. SIGRTMAX /* +1? */) {
51                 ASSERT!"sigaddset(%s) failed"(sigaddset(&sigset, sig) == 0, sig);
52             }
53         }
54         ASSERT!"pthread_sigmask failed"(pthread_sigmask(SIG_BLOCK, &sigset, null) == 0);
55 
56         try {
57             if (preThreadFunc) {
58                 // set sched priority, move to CPU set
59                 preThreadFunc(this);
60             }
61             dg();
62         }
63         catch (Throwable ex) {
64             try{import std.stdio; writeln(ex);} catch(Throwable){}
65             ASSERT!"WorkerThread threw %s(%s)"(false, typeid(ex).name, ex.msg);
66             assert(false);
67         }
68     }
69 }
70 
71 alias DeferredTaskCookie = TypedIdentifier!("DeferredTaskCookie", ulong, ulong.max, ulong.max);
72 
73 struct DeferredTask {
74     Closure taskClosure;
75     Closure finiClosure;
76     TscTimePoint timeAdded;
77     TscTimePoint timeFinished;
78     FiberHandle fibHandle;
79     ExcBuf pendingException;
80 
81     union {
82         void[128] result;
83         struct {
84             string excType;
85             string excFile;
86             size_t excLine;
87             FixedString!80 excMsg;
88         }
89     }
90 
91     @property DeferredTaskCookie cookie() const pure @nogc nothrow {
92         return DeferredTaskCookie(timeAdded.cycles);
93     }
94 
95     @notrace void set(alias F, alias Fini = null)(Parameters!F args) {
96         static if( !is( typeof(Fini) == typeof(null) ) ) {
97             static assert(
98                     is( Parameters!F == Parameters!Fini ), "Fini parameters must match callback parameters");
99             static assert(
100                     is( ReturnType!Fini == void ),
101                     "Fini callback must be of type void, not " ~ ReturnType!Fini.stringof );
102         }
103 
104         alias R = ReturnType!F;
105         static if (is(R == void)) {
106             taskClosure.set!F(args);
107         }
108         else {
109             static assert (R.sizeof <= result.sizeof);
110             static void wrapper(void* res, Parameters!F args) {
111                 *cast(R*)res = F(args);
112             }
113             taskClosure.set!wrapper(result.ptr, args);
114         }
115 
116         static if( !is( typeof(Fini) == typeof(null) ) ) {
117             finiClosure.set!Fini(args);
118         }
119     }
120 
121     void execute() {
122         // called on worker thread
123         if (!fibHandle.isValid()) {
124             DEBUG!"#THD no fiber is waiting for %s"(cookie);
125             return;
126         }
127 
128         scope(exit) timeFinished = TscTimePoint.hardNow;
129         pendingException = ExcBuf.init;
130         try {
131             DEBUG!"#THD running %s in thread"(cookie);
132             taskClosure();
133         }
134         catch (Throwable ex) {
135             pendingException.set(ex);
136         }
137     }
138 
139     @notrace void runFinalizer() nothrow {
140         try {
141             finiClosure();
142         } catch(Exception ex) {
143             ASSERT!"Thread finalizer should never throw. Threw \"%s\""(false, ex.msg);
144         }
145     }
146 }
147 
148 private extern(C) nothrow @system @nogc {
149     import core.sys.posix.pthread: pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER;
150 
151     // these are not marked as @nogc in some versions of phobos
152     int pthread_mutex_lock(pthread_mutex_t*);
153     int pthread_mutex_unlock(pthread_mutex_t*);
154 }
155 
156 private struct PthreadMutex {
157     pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
158 
159     void lock() nothrow @nogc {
160         ASSERT!"pthread_mutex_lock"(pthread_mutex_lock(&mtx) == 0);
161     }
162     void unlock() nothrow @nogc {
163         ASSERT!"pthread_mutex_unlock"(pthread_mutex_unlock(&mtx) == 0);
164     }
165 }
166 
167 struct ThreadPool(ushort numTasks) {
168     enum MAX_FETCH_STREAK = 32;
169 
170 private:
171     alias PoolType = FixedPool!(DeferredTask, numTasks);
172     alias IdxType = PoolType.IdxType;
173     enum IdxType POISON = IdxType.max >> 1;
174     static assert (numTasks < POISON);
175 
176     bool active;
177     bool threadExited;
178     shared long numActiveThreads;
179     PthreadMutex pollerThreadMutex;
180     Duration pollingInterval;
181     WorkerThread[] threads;
182     TimerHandle timerHandle;
183     PoolType tasksPool;
184     version(unittest) package ref auto utTasksPool() inout { return tasksPool; }
185     DuplexQueue!(IdxType, numTasks) queue;
186 
187 public:
188     void open(uint numThreads, size_t stackSize = 0, Duration threadPollingInterval = 10.msecs,
189               Duration reactorPollingInterval = 500.usecs) {
190         pollerThreadMutex = PthreadMutex.init;
191         pollingInterval = threadPollingInterval;
192         numActiveThreads = 0;
193         active = true;
194         threadExited = false;
195         tasksPool.open();
196         queue.open(numThreads);
197         threads.length = numThreads;
198 
199         foreach(ref thd; threads) {
200             thd = new WorkerThread(&threadFunc, stackSize);
201             thd.start();
202         }
203         while (numActiveThreads < threads.length) {
204             Thread.sleep(2.msecs);
205         }
206         timerHandle = theReactor.registerRecurringTimer(reactorPollingInterval, &completionCallback);
207     }
208 
209     void close() {
210         timerHandle.cancelTimer();
211         active = false;
212         foreach(i; 0 .. threads.length) {
213             queue.submitRequest(POISON);
214         }
215         foreach(thd; threads) {
216             thd.join();
217         }
218         destroy(queue);
219         tasksPool.close();
220     }
221 
222     private DeferredTask* pullWork() nothrow @nogc {
223         // only one thread will enter this function. the rest will wait on the pollerThreadMutex
224         // when the function fetch some work, it will release the lock and another thread will enter
225         pollerThreadMutex.lock();
226         scope(exit) pollerThreadMutex.unlock();
227 
228         while (active) {
229             IdxType idx;
230             if (queue.pullRequest(idx)) {
231                 return idx == POISON ? null : tasksPool.fromIndex(idx);
232             }
233             else {
234                 Thread.sleep(pollingInterval);
235             }
236         }
237         return null;
238     }
239 
240     private void threadFunc() {
241         atomicOp!"+="(numActiveThreads, 1);
242         scope(exit) {
243             atomicOp!"-="(numActiveThreads, 1);
244             threadExited = true;
245         }
246 
247         while (active) {
248             auto task = pullWork();
249             if (task is null || !active) {
250                 assert (!active);
251                 break;
252             }
253 
254             task.execute();
255             auto added = queue.submitResult(tasksPool.indexOf(task));
256             ASSERT!"submitResult failed"(added);
257         }
258     }
259 
260     @notrace private void completionCallback() nothrow {
261         assert (!threadExited);
262         foreach(_; 0 .. MAX_FETCH_STREAK) {
263             IdxType idx;
264             if (!queue.pullResult(idx)) {
265                 break;
266             }
267             DeferredTask* task = tasksPool.fromIndex(idx);
268             task.runFinalizer(); // call finalizer, if the user provided one
269 
270             DEBUG!"#THD pulled result of %s from thread"(task.cookie);
271             if (task.fibHandle.isValid) {
272                 theReactor.resumeFiber(task.fibHandle);
273                 task.fibHandle = null;
274             }
275             else {
276                 // the fiber is no longer there to release it -- we must do it ourselves
277                 tasksPool.release(task);
278             }
279         }
280     }
281 
282     auto deferToThread(alias F, alias Fini = null)(Timeout timeout, Parameters!F args) @nogc {
283         static assert(
284                 is( typeof(Fini) == typeof(null) ) || hasFunctionAttributes!(Fini, "nothrow"),
285                 "Fini callback must be nothrow" );
286 
287         auto task = tasksPool.alloc();
288         task.fibHandle = theReactor.currentFiberHandle;
289         task.timeAdded = TscTimePoint.now();
290         task.set!(F, Fini)(args);
291         auto added = queue.submitRequest(tasksPool.indexOf(task));
292         ASSERT!"submitRequest"(added);
293 
294         //
295         // once submitted, the task no longer belongs (solely) to us. we go to sleep until either:
296         //   * the completion callback fetched the task (suspendCurrentFiber returns)
297         //   * the fiber was killed/timed out (suspendCurrentFiber throws)
298         //      - note that the thread may or may not be done
299         //      - if it is done, we must release it.
300         //
301         try {
302             theReactor.suspendCurrentFiber(timeout);
303         }
304         catch (Throwable ex) {
305             if (task.fibHandle.isValid) {
306                 // fiber was killed while thread still holds the task (or at least,
307                 // completionCallback hasn't fetched this task yet).
308                 // do NOT release, but mark defunct -- completionCallback will finalize and release
309                 task.fibHandle = null;
310             }
311             else {
312                 // thread is done with the task (completionCallback already fetched this task yet).
313                 // release it, since completionCallback won't do that any more.
314                 // the task is already finalized
315                 tasksPool.release(task);
316             }
317             throw ex;
318         }
319 
320         // we reach this part if-and-only-if the thread is done with the task.
321         // the task is already finalized
322         scope(exit) tasksPool.release(task);
323 
324         Throwable ex = task.pendingException.get();
325         if (ex !is null) {
326             // We don't know what the lifetime of the task is, so copy the exception again
327             throw setEx(ex);
328         }
329 
330         static if (!is(ReturnType!F == void)) {
331             auto tmp = *(cast(ReturnType!F*)task.result.ptr);
332             return tmp;
333         }
334     }
335 }
336 
337 unittest {
338     import mecca.reactor: Reactor, testWithReactor;
339 
340     __gshared static long sum;
341     __gshared static long done;
342 
343     static int sleeper(Duration dur, int x) {
344         Thread.sleep(dur);
345         return x * 2;
346     }
347 
348     static void sleeperFib(int x) {
349         auto res = theReactor.deferToThread!sleeper(x.msecs, x);
350         assert (res == x * 2);
351         sum += x;
352         done--;
353     }
354 
355     Reactor.OpenOptions options;
356     options.threadDeferralEnabled = true;
357 
358     testWithReactor({
359         done = 0;
360         foreach(int i; [10, 20, 30, 40, 50, 45, 35, 25, 15]) {
361             done++;
362             theReactor.spawnFiber(&sleeperFib, i);
363         }
364 
365         // XXX: need semaphore
366         while (done > 0) {
367             theReactor.sleep(10.msecs);
368         }
369 
370         assert (sum == 270);
371     }, options);
372 }
373 
374 unittest {
375     import mecca.reactor;
376     import mecca.reactor.sync.event;
377     import mecca.reactor.types : ReactorExit;
378     import std.traits;
379 
380     static struct Context {
381         uint counter;
382         shared bool inThread;
383         Event started, done;
384 
385         void threadBody() {
386             assert(!inThread, "Variable marked in thread at thread beginning");
387             inThread = true;
388             scope(exit) inThread = false;
389             Thread.sleep(20.msecs);
390         }
391 
392         static void proxyBody(Context* _this) {
393             _this.threadBody();
394         }
395 
396         void testFini() nothrow {
397             counter++;
398             done.set();
399         }
400 
401         static void proxyFini(Context* _this) nothrow {
402             return _this.testFini();
403         }
404 
405         void testFiber() {
406             started.set();
407             DEBUG!"Deferring to thread"();
408             theReactor.deferToThread!(proxyBody, proxyFini)(&this);
409             assert(false, "Thread finished successfully when it shouldn't");
410         }
411     }
412 
413     void testBody() {
414         Context context;
415 
416         auto handle = theReactor.spawnFiber(&context.testFiber);
417 
418         context.started.wait();
419         // Wait for the thread queue to pick up the new task
420         theReactor.sleep(14.msecs);
421         theReactor.throwInFiber!ReactorExit(handle);
422         assert(context.counter==0);
423         assert(context.inThread);
424         context.done.wait(Timeout(50.msecs));
425         assert(!context.inThread);
426         assert(context.counter==1);
427     }
428 
429     Reactor.OpenOptions options;
430     options.threadDeferralEnabled = true;
431     testWithReactor(&testBody, options);
432 }
433 
434 unittest {
435     // Make sure an exception thrown from a thread is forwarded as is
436     import mecca.reactor;
437 
438     static class SomeException : Exception {
439         mixin ExceptionBody!"Just some exception";
440     }
441 
442     static void threadBody() {
443         throw mkEx!SomeException;
444     }
445 
446     void testBody() {
447         try {
448             theReactor.deferToThread!threadBody();
449         } catch(SomeException ex) {
450         } catch(Throwable ex) {
451             ASSERT!"Received wrong exception %s"(false, ex.msg);
452         }
453     }
454 
455     Reactor.OpenOptions options;
456     options.threadDeferralEnabled = true;
457     testWithReactor(&testBody, options);
458 }
459 
460 unittest {
461     // Make sure an exception thrown from a thread doesn't leak a task
462     import mecca.reactor;
463 
464     static class SomeException : Exception {
465         mixin ExceptionBody!"Just some exception";
466     }
467 
468     static void threadBody() {
469         throw mkEx!SomeException;
470     }
471 
472     static void runThreadThatThrowsException() {
473         try {
474             theReactor.deferToThread!threadBody();
475         } catch(SomeException ex) {
476             // Expecting this
477         }
478     }
479 
480     Reactor.OpenOptions options;
481     options.threadDeferralEnabled = true;
482 
483     testWithReactor({
484         auto tasksAvailableInPool() {
485             return theReactor.utThreadPool.utTasksPool.numAvailable();
486         }
487 
488         // Verify the number of tasks available tasks in the pool doesn't decrease
489         const initialTasksAvailableInPool = tasksAvailableInPool();
490         runThreadThatThrowsException();
491         assertEQ(initialTasksAvailableInPool, tasksAvailableInPool());
492     }, options);
493 
494     testWithReactor({
495         // Run more threads than MAX_DEFERRED_TASKS - crashes when the tasks leak.
496         // Run in concurrent chunks to avoid the test running for a very long time.
497         import mecca.reactor.sync.barrier: Barrier;
498         Barrier barrier;
499         enum CHUNK_SIZE = 128;
500         uint counter = 0;
501         while (counter < Reactor.UT_MAX_DEFERRED_TASKS * 2) {
502             foreach(_; 0..CHUNK_SIZE) {
503                 counter++;
504                 barrier.addWaiter();
505                 theReactor.spawnFiber({
506                     runThreadThatThrowsException();
507                     barrier.markDone();
508                 });
509             }
510             barrier.waitAll();
511         }
512     }, options);
513 }