1 /// group related fibers so they can all be killed together, if needed 2 module mecca.reactor.fiber_group; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import std.traits; 7 8 import mecca.containers.lists; 9 import mecca.lib.exception; 10 import mecca.lib.time; 11 import mecca.log; 12 import mecca.reactor; 13 14 /// group of related fiber that may need to be killed together 15 struct FiberGroup { 16 private: 17 package class FiberGroupExtinction: FiberInterrupt {mixin ExceptionBody!"FiberGroup killer exception";} 18 alias RegisteredFibersList = _LinkedList!(ReactorFiber*, "params.fgChain.next", "params.fgChain.prev", "params.fgChain.owner", false); 19 20 package struct Chain { 21 ReactorFiber* prev, next; 22 RegisteredFibersList* owner; 23 } 24 25 RegisteredFibersList fibersList; 26 enum State { 27 None, 28 Active, 29 Closing, 30 } 31 State state; 32 33 public: 34 35 @disable this(this); 36 37 ~this() @safe @nogc nothrow { 38 ASSERT!"FiberGroup destructed but is not fully closed"(closed); 39 } 40 41 /// 3state execution results type 42 struct ExecutionResult(T) { 43 /// Whether execution finished successfully 44 bool completed = false; 45 static if (!is(T == void)) { 46 /// Actual execution result (if type is not void) 47 T result; 48 } 49 } 50 51 /// Initialize a new fiber group 52 void open() nothrow @safe @nogc { 53 ASSERT!"New FiberGroup has fibers registered"(fibersList.empty); 54 // The following line has a side effect!! "closed" will set the state to None if it is Closing and all fibers 55 // have quit. Since this is an ASSERT (i.e. - does not get compiled away), and since we're overriding the state 56 // at the very next statement, this is not a problem. 57 ASSERT!"Cannot open fiber group that is not closed: Current state %s"(closed, state); 58 state = State.Active; 59 } 60 61 /// report whether a fiber group is closed 62 /// 63 /// This call will return `false` for half-closed groups. 64 @property closed() pure nothrow @safe @nogc { 65 // If we closed without waiting, we might still be in Closing state. 66 if( state==State.Closing && fibersList.empty ) 67 state=State.None; 68 69 return state == State.None; 70 } 71 72 /** 73 * close the group (killing all fibers). 74 * 75 * It is legal for the calling thread to be part of the group. If `waitForExit` is true, it will be killed only once 76 * the function is done waiting for the other fibers to exit. 77 * 78 * Params: 79 * waitForExit = Whether the function waits for all fibers to exit before returning. 80 * 81 * Notes: 82 * If waitForExit is false, the group cannot be reused before all fibers have actually exited. 83 */ 84 void close(bool waitForExit = true) @safe @nogc { 85 if( state!=State.Active ) 86 return; 87 88 auto cs = theReactor.criticalSection(); 89 state = State.Closing; 90 91 bool suicide; 92 93 auto ourFiberId = theReactor.currentFiberId; 94 auto killerException = mkEx!FiberGroupExtinction; 95 foreach(fiber; fibersList.range) { 96 if( fiber.identity == ourFiberId ) { 97 suicide = true; 98 continue; 99 } 100 101 // WARN_AS!"Fiber killed by group"(fiber.identity); 102 WARN!"Killing fiber %s"(fiber.identity); 103 theReactor.throwInFiber!FiberGroupExtinction(FiberHandle(fiber)); 104 } 105 106 if( !waitForExit ) { 107 if( suicide ) { 108 WARN!"Fiber commiting suicid as part of group"(); 109 throw killerException; 110 } 111 return; 112 } 113 114 cs.leave(); 115 116 if( suicide ) 117 removeThisFiber(); 118 119 waitEmpty(); 120 DEBUG!"All group fibers are now dead. May they rest in pieces"(); 121 122 state = State.None; 123 124 if (suicide) { 125 // For consistency, add ourselves back. 126 addThisFiber(true); 127 WARN!"Fiber commiting suicid as part of group"(); 128 throw killerException; 129 } 130 } 131 132 /// Wait for all fibers in a group to exit. 133 /// 134 /// This function does not initiate termination. Use `close` to actually terminate the fibers. 135 void waitEmpty(Timeout timeout = Timeout.infinite) @safe @nogc { 136 // Wait for all fibers to die an agonizing death 137 while( !fibersList.empty ) { 138 theReactor.joinFiber( FiberHandle(fibersList.head), timeout ); 139 } 140 } 141 142 /// Return true if the current fiber is a member of the group 143 bool isCurrentFiberMember() const nothrow @safe @nogc { 144 return theReactor.currentFiberPtr in fibersList; 145 } 146 147 // DMDBUG https://issues.dlang.org/show_bug.cgi?id=16206 148 // The non-template version must come before the templated version 149 /** 150 * Spawn a new fiber that will be a member of the group. 151 * 152 * Arguments and return value are the same as for theReactor.spawnFiber. 153 */ 154 FiberHandle spawnFiber(void delegate() dg) nothrow @safe @nogc { 155 static void wrapper(void delegate() dg) @system { 156 dg(); 157 } 158 return spawnFiber!wrapper(dg); 159 } 160 161 /// ditto 162 FiberHandle spawnFiber(alias F)(ParameterTypeTuple!F args) nothrow @safe @nogc { 163 ASSERT!"FiberGroup state not Active: %s"(state == State.Active, state); 164 alias funcType = typeof(F); 165 auto fib = theReactor.spawnFiber( &fiberWrapper!funcType, &F, &this, args ); 166 167 return fib; 168 } 169 170 /** 171 * Conditionally spawn a new fiber, only if the fiber group is currently open. 172 * 173 * This function is useful in certain racy cases, where the fiber group has been closed, but has not yet finished closing. 174 * 175 * Params: 176 * dg = the delegate to run inside the fiber 177 * 178 * Returns: 179 * The FiberHandle of the new fiber if successful, the invalid FiberHandle if the fiber group is closed or closing. 180 */ 181 FiberHandle spawnFiberIfOpen(void delegate() dg) nothrow @safe @nogc { 182 if( state != State.Active ) 183 return FiberHandle.init; 184 185 return spawnFiber(dg); 186 } 187 188 /** 189 * Perform a task inside the current fiber as part of the group. 190 * 191 * This function temporarily adds the current fiber to the group for the sake of performing a specific function. 192 * Once that function is done, the fiber leaves the group again. 193 * 194 * If the group is killed while inside this function, the function returns early and the return type has the member 195 * `completed` set to false. If the function ran to completion, `completed` is set to true, and `result` is set to 196 * the function's return value (if one exists). 197 * 198 * If the fiber is already a member of the group when this function is called, the function is simply executed 199 * normally. 200 */ 201 @notrace auto runTracked(alias F)(ParameterTypeTuple!F args) { 202 alias R = ReturnType!F; 203 ExecutionResult!R res; 204 205 if(isCurrentFiberMember()) { 206 // No need to attach again 207 return invoke!F(args); 208 } 209 210 addThisFiber(); 211 bool fiberAdded = true; 212 scope(exit) { 213 if(fiberAdded) 214 removeThisFiber(); 215 } 216 217 try { 218 res = invoke!F(args); 219 } catch( FiberGroupExtinction ex ) { 220 WARN!"Fiber %s killed in contained context by FiberGroup"(theReactor.currentFiberId); 221 removeThisFiber(); 222 fiberAdded = false; 223 } 224 225 return res; 226 } 227 228 /// ditto 229 auto runTracked(T)(scope T delegate() dg) { 230 static T wrapper(scope T delegate() dg) { 231 return dg(); 232 } 233 return runTracked!(wrapper)(dg); 234 } 235 236 private: 237 @notrace void addThisFiber(bool duringSuicide = false) nothrow @safe @nogc { 238 ASSERT!"FiberGroup state not Active: %s"(state == State.Active || state==State.None && duringSuicide, state); 239 auto fib = theReactor.currentFiberPtr; 240 DBG_ASSERT!"Trying to add fiber already in group"( fib !in fibersList ); 241 DBG_ASSERT!"Trying to add fiber to group which is already member of another group"( fib.params.fgChain.owner is null ); 242 fibersList.append(fib); 243 } 244 245 @notrace void removeThisFiber() nothrow @safe @nogc { 246 ASSERT!"FiberGroup asked to remove fiber which is not a member"(isCurrentFiberMember()); 247 fibersList.remove(theReactor.currentFiberPtr); 248 } 249 250 @notrace static auto invoke(alias F)(ParameterTypeTuple!F args) { 251 alias R = ReturnType!F; 252 ExecutionResult!R res; 253 254 static if (is(R == void)) { 255 F(args); 256 } else { 257 res.result = F(args); 258 } 259 res.completed = true; 260 261 return res; 262 } 263 264 @notrace static void fiberWrapper(T)(T* fn, FiberGroup* fg, ParameterTypeTuple!T args) { 265 if( fg.state!=State.Active ) { 266 WARN!"Fiber group closed before fiber managed to start"(); 267 return; 268 } 269 270 fg.addThisFiber(); 271 scope(exit) fg.removeThisFiber(); 272 theReactor.setFiberName(theReactor.currentFiberHandle, "FiberGroupMember", fn); 273 fn(args); 274 } 275 } 276 277 unittest { 278 static int counter; 279 counter = 0; 280 281 static void fib(int num) { 282 scope(success) assert(false); 283 284 while (true) { 285 counter += num; 286 theReactor.sleep(msecs(1)); 287 } 288 } 289 290 testWithReactor({ 291 FiberGroup tracker; 292 293 { 294 DEBUG!"#UT Test fibers running and being killed"(); 295 tracker.open(); 296 297 tracker.addThisFiber(); 298 scope(exit) tracker.removeThisFiber(); 299 tracker.spawnFiber!fib(1); 300 tracker.spawnFiber!fib(100); 301 theReactor.sleep(msecs(50)); 302 // this fiber won't get to run 303 tracker.spawnFiber!fib(10000); 304 305 bool caught = false; 306 try { 307 tracker.close(); 308 } 309 catch (tracker.FiberGroupExtinction ex) { 310 caught = true; 311 } 312 313 theReactor.sleep(2.msecs); 314 315 assert(caught, "this fiber did not commit suicide"); 316 assert(counter > 0, "no fibers have run"); 317 assert(counter < 10000, "third fiber should not have run"); 318 } 319 320 { 321 int counter2 = 0; 322 323 tracker.open(); 324 325 static class SomeException: Exception {mixin ExceptionBody!"Some exception";} 326 327 static bool caught = false; 328 try { 329 tracker.runTracked({ 330 throw mkEx!SomeException; 331 }); 332 } catch (SomeException ex) { 333 caught = true; 334 } 335 assert(caught, "exception wasn't passed up"); 336 337 tracker.runTracked({ 338 theReactor.registerTimer(Timeout(msecs(20)), (){ 339 theReactor.spawnFiber({ 340 tracker.close(); 341 }); 342 }); 343 344 scope(success) assert(false); 345 346 while (true) { 347 counter2++; 348 theReactor.sleep(msecs(1)); 349 } 350 }); 351 352 assert(counter2 > 0, "no fibers have run"); 353 assert(counter2 < 10000, "third fiber should not have run"); 354 } 355 356 { 357 // test fiber suicide 358 tracker.open(); 359 tracker.spawnFiber({tracker.close();}); 360 theReactor.sleep(msecs(1)); 361 assert(tracker.closed()); 362 } 363 }); 364 } 365 /* 366 Make sure nested calls work correctly 367 */ 368 unittest { 369 import std.exception; 370 import std.stdio; 371 372 testWithReactor({ 373 FiberGroup foo; 374 foo.open(); 375 376 // Make sure we dont catch the fiberbomb in the nested call 377 auto res1 = foo.runTracked({ 378 foo.runTracked({ 379 throw mkEx!(foo.FiberGroupExtinction); 380 }); 381 assert(false, "Nested call caught the fiber bomb!"); 382 }); 383 assert(res1.completed == false, "res2 marked as completed when shouldn't has"); 384 385 // Make sure we dont catch the fiberbomb in a deeply nested call 386 auto res2 = foo.runTracked({ 387 foo.runTracked({ 388 foo.runTracked({ 389 foo.runTracked({ 390 foo.runTracked({ 391 throw mkEx!(foo.FiberGroupExtinction); 392 }); 393 assert(false, "Nested call caught the fiber bomb!"); 394 }); 395 assert(false, "Nested call caught the fiber bomb!"); 396 }); 397 assert(false, "Nested call caught the fiber bomb!"); 398 }); 399 assert(false, "Nested call caught the fiber bomb!"); 400 }); 401 assert(res2.completed == false, "res2 marked as completed when shouldn't have"); 402 403 foo.close(); 404 }); 405 } 406 407 /* 408 Spawn fibers on inactive trackers 409 */ 410 unittest { 411 import std.exception; 412 import std.stdio; 413 414 testWithReactor({ 415 FiberGroup foo; 416 417 void dlg() {} 418 419 assert( !foo.spawnFiberIfOpen(&dlg).isValid ); 420 421 foo.open(); 422 423 assert(foo.spawnFiberIfOpen(&dlg).isValid); 424 425 foo.close(); 426 427 assert( !foo.spawnFiberIfOpen(&dlg).isValid); 428 }); 429 } 430 431 unittest { 432 uint counter = 0; 433 FiberGroup tracker; 434 435 void fib1() { 436 scope(exit) { 437 counter++; 438 } 439 theReactor.sleep(10.msecs); 440 } 441 442 testWithReactor( 443 { 444 static void closer(FiberGroup* fg, bool wait) { 445 fg.close(wait); 446 } 447 448 { 449 tracker.open(); 450 451 tracker.spawnFiber(&fib1); 452 tracker.spawnFiber(&fib1); 453 454 theReactor.yield(); 455 456 tracker.runTracked!closer(&tracker, true); 457 assertEQ(counter, 2); 458 459 counter=0; 460 tracker.open(); 461 462 tracker.spawnFiber(&fib1); 463 tracker.spawnFiber(&fib1); 464 465 theReactor.yield(); 466 467 tracker.runTracked!closer(&tracker, false); 468 assertEQ(counter, 0); 469 theReactor.yield(); 470 assertEQ(counter, 2); 471 } 472 473 tracker.close(); 474 }); 475 } 476 477 unittest { 478 META!"Make sure that close(false) works properly"(); 479 480 int counter; 481 482 void fiberBody() { 483 scope(exit) counter++; 484 485 theReactor.sleep(10.days); 486 } 487 488 testWithReactor({ 489 FiberGroup group; 490 491 group.open(); 492 493 group.spawnFiber(&fiberBody); 494 group.spawnFiber(&fiberBody); 495 496 assertEQ(counter, 0); 497 theReactor.yield(); 498 499 assertEQ(counter, 0); 500 assert(!group.closed); 501 502 group.close(false); 503 504 assert(!group.closed); 505 assertThrows!AssertError(group.open()); 506 507 theReactor.yield(); 508 assert(group.closed); 509 510 group.open(); 511 group.close(); 512 }); 513 }