1 module mecca.reactor.subsystems.threading; 2 3 // Licensed under the Boost license. Full copyright information in the AUTHORS file 4 5 import core.atomic; 6 import core.thread; 7 import core.sys.posix.signal; 8 import std.exception; 9 10 import mecca.platform.os: currentThreadId, ThreadId; 11 import mecca.lib.reflection; 12 import mecca.lib.exception; 13 import mecca.lib.time; 14 import mecca.lib.typedid: TypedIdentifier; 15 16 import mecca.containers.otm_queue: DuplexQueue; 17 import mecca.containers.arrays: FixedString; 18 import mecca.containers.pools: FixedPool; 19 20 import mecca.log; 21 import mecca.reactor: theReactor, FiberHandle, TimerHandle; 22 23 24 class WorkerThread: Thread { 25 public import mecca.platform.os : BLOCKED_SIGNALS; 26 27 __gshared static void delegate(WorkerThread) preThreadFunc; 28 29 align(8) ThreadId kernel_tid = -1; 30 void delegate() dg; 31 32 this(void delegate() dg, size_t stackSize = 0) { 33 kernel_tid = -1; 34 this.dg = dg; 35 this.isDaemon = true; 36 super(&wrapper, stackSize); 37 } 38 39 private void wrapper() nothrow { 40 scope(exit) kernel_tid = -1; 41 kernel_tid = currentThreadId(); 42 43 sigset_t sigset = void; 44 ASSERT!"sigemptyset failed"(sigemptyset(&sigset) == 0); 45 foreach(sig; BLOCKED_SIGNALS) { 46 ASSERT!"sigaddset(%s) failed"(sigaddset(&sigset, sig) == 0, sig); 47 } 48 static if (is(typeof(SIGRTMIN)) && is(typeof(SIGRTMAX))) 49 { 50 foreach(sig; SIGRTMIN .. SIGRTMAX /* +1? */) { 51 ASSERT!"sigaddset(%s) failed"(sigaddset(&sigset, sig) == 0, sig); 52 } 53 } 54 ASSERT!"pthread_sigmask failed"(pthread_sigmask(SIG_BLOCK, &sigset, null) == 0); 55 56 try { 57 if (preThreadFunc) { 58 // set sched priority, move to CPU set 59 preThreadFunc(this); 60 } 61 dg(); 62 } 63 catch (Throwable ex) { 64 try{import std.stdio; writeln(ex);} catch(Throwable){} 65 ASSERT!"WorkerThread threw %s(%s)"(false, typeid(ex).name, ex.msg); 66 assert(false); 67 } 68 } 69 } 70 71 alias DeferredTaskCookie = TypedIdentifier!("DeferredTaskCookie", ulong, ulong.max, ulong.max); 72 73 struct DeferredTask { 74 Closure taskClosure; 75 Closure finiClosure; 76 TscTimePoint timeAdded; 77 TscTimePoint timeFinished; 78 FiberHandle fibHandle; 79 ExcBuf pendingException; 80 81 union { 82 void[128] result; 83 struct { 84 string excType; 85 string excFile; 86 size_t excLine; 87 FixedString!80 excMsg; 88 } 89 } 90 91 @property DeferredTaskCookie cookie() const pure @nogc nothrow { 92 return DeferredTaskCookie(timeAdded.cycles); 93 } 94 95 @notrace void set(alias F, alias Fini = null)(Parameters!F args) { 96 static if( !is( typeof(Fini) == typeof(null) ) ) { 97 static assert( 98 is( Parameters!F == Parameters!Fini ), "Fini parameters must match callback parameters"); 99 static assert( 100 is( ReturnType!Fini == void ), 101 "Fini callback must be of type void, not " ~ ReturnType!Fini.stringof ); 102 } 103 104 alias R = ReturnType!F; 105 static if (is(R == void)) { 106 taskClosure.set!F(args); 107 } 108 else { 109 static assert (R.sizeof <= result.sizeof); 110 static void wrapper(void* res, Parameters!F args) { 111 *cast(R*)res = F(args); 112 } 113 taskClosure.set!wrapper(result.ptr, args); 114 } 115 116 static if( !is( typeof(Fini) == typeof(null) ) ) { 117 finiClosure.set!Fini(args); 118 } 119 } 120 121 void execute() { 122 // called on worker thread 123 if (!fibHandle.isValid()) { 124 DEBUG!"#THD no fiber is waiting for %s"(cookie); 125 return; 126 } 127 128 scope(exit) timeFinished = TscTimePoint.hardNow; 129 pendingException = ExcBuf.init; 130 try { 131 DEBUG!"#THD running %s in thread"(cookie); 132 taskClosure(); 133 } 134 catch (Throwable ex) { 135 pendingException.set(ex); 136 } 137 } 138 139 @notrace void runFinalizer() nothrow { 140 try { 141 finiClosure(); 142 } catch(Exception ex) { 143 ASSERT!"Thread finalizer should never throw. Threw \"%s\""(false, ex.msg); 144 } 145 } 146 } 147 148 private extern(C) nothrow @system @nogc { 149 import core.sys.posix.pthread: pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER; 150 151 // these are not marked as @nogc in some versions of phobos 152 int pthread_mutex_lock(pthread_mutex_t*); 153 int pthread_mutex_unlock(pthread_mutex_t*); 154 } 155 156 private struct PthreadMutex { 157 pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; 158 159 void lock() nothrow @nogc { 160 ASSERT!"pthread_mutex_lock"(pthread_mutex_lock(&mtx) == 0); 161 } 162 void unlock() nothrow @nogc { 163 ASSERT!"pthread_mutex_unlock"(pthread_mutex_unlock(&mtx) == 0); 164 } 165 } 166 167 struct ThreadPool(ushort numTasks) { 168 enum MAX_FETCH_STREAK = 32; 169 170 private: 171 alias PoolType = FixedPool!(DeferredTask, numTasks); 172 alias IdxType = PoolType.IdxType; 173 enum IdxType POISON = IdxType.max >> 1; 174 static assert (numTasks < POISON); 175 176 bool active; 177 bool threadExited; 178 shared long numActiveThreads; 179 PthreadMutex pollerThreadMutex; 180 Duration pollingInterval; 181 WorkerThread[] threads; 182 TimerHandle timerHandle; 183 PoolType tasksPool; 184 version(unittest) package ref auto utTasksPool() inout { return tasksPool; } 185 DuplexQueue!(IdxType, numTasks) queue; 186 187 public: 188 void open(uint numThreads, size_t stackSize = 0, Duration threadPollingInterval = 10.msecs, 189 Duration reactorPollingInterval = 500.usecs) { 190 pollerThreadMutex = PthreadMutex.init; 191 pollingInterval = threadPollingInterval; 192 numActiveThreads = 0; 193 active = true; 194 threadExited = false; 195 tasksPool.open(); 196 queue.open(numThreads); 197 threads.length = numThreads; 198 199 foreach(ref thd; threads) { 200 thd = new WorkerThread(&threadFunc, stackSize); 201 thd.start(); 202 } 203 while (numActiveThreads < threads.length) { 204 Thread.sleep(2.msecs); 205 } 206 timerHandle = theReactor.registerRecurringTimer(reactorPollingInterval, &completionCallback); 207 } 208 209 void close() { 210 timerHandle.cancelTimer(); 211 active = false; 212 foreach(i; 0 .. threads.length) { 213 queue.submitRequest(POISON); 214 } 215 foreach(thd; threads) { 216 thd.join(); 217 } 218 destroy(queue); 219 tasksPool.close(); 220 } 221 222 private DeferredTask* pullWork() nothrow @nogc { 223 // only one thread will enter this function. the rest will wait on the pollerThreadMutex 224 // when the function fetch some work, it will release the lock and another thread will enter 225 pollerThreadMutex.lock(); 226 scope(exit) pollerThreadMutex.unlock(); 227 228 while (active) { 229 IdxType idx; 230 if (queue.pullRequest(idx)) { 231 return idx == POISON ? null : tasksPool.fromIndex(idx); 232 } 233 else { 234 Thread.sleep(pollingInterval); 235 } 236 } 237 return null; 238 } 239 240 private void threadFunc() { 241 atomicOp!"+="(numActiveThreads, 1); 242 scope(exit) { 243 atomicOp!"-="(numActiveThreads, 1); 244 threadExited = true; 245 } 246 247 while (active) { 248 auto task = pullWork(); 249 if (task is null || !active) { 250 assert (!active); 251 break; 252 } 253 254 task.execute(); 255 auto added = queue.submitResult(tasksPool.indexOf(task)); 256 ASSERT!"submitResult failed"(added); 257 } 258 } 259 260 @notrace private void completionCallback() nothrow { 261 assert (!threadExited); 262 foreach(_; 0 .. MAX_FETCH_STREAK) { 263 IdxType idx; 264 if (!queue.pullResult(idx)) { 265 break; 266 } 267 DeferredTask* task = tasksPool.fromIndex(idx); 268 task.runFinalizer(); // call finalizer, if the user provided one 269 270 DEBUG!"#THD pulled result of %s from thread"(task.cookie); 271 if (task.fibHandle.isValid) { 272 theReactor.resumeFiber(task.fibHandle); 273 task.fibHandle = null; 274 } 275 else { 276 // the fiber is no longer there to release it -- we must do it ourselves 277 tasksPool.release(task); 278 } 279 } 280 } 281 282 auto deferToThread(alias F, alias Fini = null)(Timeout timeout, Parameters!F args) @nogc { 283 static assert( 284 is( typeof(Fini) == typeof(null) ) || hasFunctionAttributes!(Fini, "nothrow"), 285 "Fini callback must be nothrow" ); 286 287 auto task = tasksPool.alloc(); 288 task.fibHandle = theReactor.currentFiberHandle; 289 task.timeAdded = TscTimePoint.now(); 290 task.set!(F, Fini)(args); 291 auto added = queue.submitRequest(tasksPool.indexOf(task)); 292 ASSERT!"submitRequest"(added); 293 294 // 295 // once submitted, the task no longer belongs (solely) to us. we go to sleep until either: 296 // * the completion callback fetched the task (suspendCurrentFiber returns) 297 // * the fiber was killed/timed out (suspendCurrentFiber throws) 298 // - note that the thread may or may not be done 299 // - if it is done, we must release it. 300 // 301 try { 302 theReactor.suspendCurrentFiber(timeout); 303 } 304 catch (Throwable ex) { 305 if (task.fibHandle.isValid) { 306 // fiber was killed while thread still holds the task (or at least, 307 // completionCallback hasn't fetched this task yet). 308 // do NOT release, but mark defunct -- completionCallback will finalize and release 309 task.fibHandle = null; 310 } 311 else { 312 // thread is done with the task (completionCallback already fetched this task yet). 313 // release it, since completionCallback won't do that any more. 314 // the task is already finalized 315 tasksPool.release(task); 316 } 317 throw ex; 318 } 319 320 // we reach this part if-and-only-if the thread is done with the task. 321 // the task is already finalized 322 scope(exit) tasksPool.release(task); 323 324 Throwable ex = task.pendingException.get(); 325 if (ex !is null) { 326 // We don't know what the lifetime of the task is, so copy the exception again 327 throw setEx(ex); 328 } 329 330 static if (!is(ReturnType!F == void)) { 331 auto tmp = *(cast(ReturnType!F*)task.result.ptr); 332 return tmp; 333 } 334 } 335 } 336 337 unittest { 338 import mecca.reactor: Reactor, testWithReactor; 339 340 __gshared static long sum; 341 __gshared static long done; 342 343 static int sleeper(Duration dur, int x) { 344 Thread.sleep(dur); 345 return x * 2; 346 } 347 348 static void sleeperFib(int x) { 349 auto res = theReactor.deferToThread!sleeper(x.msecs, x); 350 assert (res == x * 2); 351 sum += x; 352 done--; 353 } 354 355 Reactor.OpenOptions options; 356 options.threadDeferralEnabled = true; 357 358 testWithReactor({ 359 done = 0; 360 foreach(int i; [10, 20, 30, 40, 50, 45, 35, 25, 15]) { 361 done++; 362 theReactor.spawnFiber(&sleeperFib, i); 363 } 364 365 // XXX: need semaphore 366 while (done > 0) { 367 theReactor.sleep(10.msecs); 368 } 369 370 assert (sum == 270); 371 }, options); 372 } 373 374 unittest { 375 import mecca.reactor; 376 import mecca.reactor.sync.event; 377 import mecca.reactor.types : ReactorExit; 378 import std.traits; 379 380 static struct Context { 381 uint counter; 382 shared bool inThread; 383 Event started, done; 384 385 void threadBody() { 386 assert(!inThread, "Variable marked in thread at thread beginning"); 387 inThread = true; 388 scope(exit) inThread = false; 389 Thread.sleep(20.msecs); 390 } 391 392 static void proxyBody(Context* _this) { 393 _this.threadBody(); 394 } 395 396 void testFini() nothrow { 397 counter++; 398 done.set(); 399 } 400 401 static void proxyFini(Context* _this) nothrow { 402 return _this.testFini(); 403 } 404 405 void testFiber() { 406 started.set(); 407 DEBUG!"Deferring to thread"(); 408 theReactor.deferToThread!(proxyBody, proxyFini)(&this); 409 assert(false, "Thread finished successfully when it shouldn't"); 410 } 411 } 412 413 void testBody() { 414 Context context; 415 416 auto handle = theReactor.spawnFiber(&context.testFiber); 417 418 context.started.wait(); 419 // Wait for the thread queue to pick up the new task 420 theReactor.sleep(14.msecs); 421 theReactor.throwInFiber!ReactorExit(handle); 422 assert(context.counter==0); 423 assert(context.inThread); 424 context.done.wait(Timeout(50.msecs)); 425 assert(!context.inThread); 426 assert(context.counter==1); 427 } 428 429 Reactor.OpenOptions options; 430 options.threadDeferralEnabled = true; 431 testWithReactor(&testBody, options); 432 } 433 434 unittest { 435 // Make sure an exception thrown from a thread is forwarded as is 436 import mecca.reactor; 437 438 static class SomeException : Exception { 439 mixin ExceptionBody!"Just some exception"; 440 } 441 442 static void threadBody() { 443 throw mkEx!SomeException; 444 } 445 446 void testBody() { 447 try { 448 theReactor.deferToThread!threadBody(); 449 } catch(SomeException ex) { 450 } catch(Throwable ex) { 451 ASSERT!"Received wrong exception %s"(false, ex.msg); 452 } 453 } 454 455 Reactor.OpenOptions options; 456 options.threadDeferralEnabled = true; 457 testWithReactor(&testBody, options); 458 } 459 460 unittest { 461 // Make sure an exception thrown from a thread doesn't leak a task 462 import mecca.reactor; 463 464 static class SomeException : Exception { 465 mixin ExceptionBody!"Just some exception"; 466 } 467 468 static void threadBody() { 469 throw mkEx!SomeException; 470 } 471 472 static void runThreadThatThrowsException() { 473 try { 474 theReactor.deferToThread!threadBody(); 475 } catch(SomeException ex) { 476 // Expecting this 477 } 478 } 479 480 Reactor.OpenOptions options; 481 options.threadDeferralEnabled = true; 482 483 testWithReactor({ 484 auto tasksAvailableInPool() { 485 return theReactor.utThreadPool.utTasksPool.numAvailable(); 486 } 487 488 // Verify the number of tasks available tasks in the pool doesn't decrease 489 const initialTasksAvailableInPool = tasksAvailableInPool(); 490 runThreadThatThrowsException(); 491 assertEQ(initialTasksAvailableInPool, tasksAvailableInPool()); 492 }, options); 493 494 testWithReactor({ 495 // Run more threads than MAX_DEFERRED_TASKS - crashes when the tasks leak. 496 // Run in concurrent chunks to avoid the test running for a very long time. 497 import mecca.reactor.sync.barrier: Barrier; 498 Barrier barrier; 499 enum CHUNK_SIZE = 128; 500 uint counter = 0; 501 while (counter < Reactor.UT_MAX_DEFERRED_TASKS * 2) { 502 foreach(_; 0..CHUNK_SIZE) { 503 counter++; 504 barrier.addWaiter(); 505 theReactor.spawnFiber({ 506 runThreadThatThrowsException(); 507 barrier.markDone(); 508 }); 509 } 510 barrier.waitAll(); 511 } 512 }, options); 513 }