1 /// Fiber future result execution 2 module mecca.reactor.sync.future; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import std.exception; 7 import std.traits; 8 9 import mecca.log; 10 import mecca.lib.exception; 11 import mecca.lib.time; 12 import mecca.reactor; 13 import mecca.reactor.utils; 14 import mecca.reactor.sync.event: Signal; 15 16 /// Custom exception thrown by `Future.get` if the fiber quit without setting a result 17 class FiberKilledWithNoResult : Exception { 18 mixin ExceptionBody!"Future controlled fiber killed without setting a result"; 19 } 20 21 /// Representation for future calculation result 22 @notrace 23 struct Future(T) { 24 private: 25 Signal suspender; 26 FiberHandle fiberHandle; 27 ExcBuf _exBuf; 28 static if (!is(T == void)) { 29 @notrace T _value; 30 } 31 bool isSet = false; 32 33 public: 34 @disable this(this); 35 36 /// Returns whether the future has a return value 37 @property bool ready() const pure nothrow @safe @nogc { 38 return isSet; 39 } 40 41 /// Returns the exception stored in the future's result (if any) 42 @property Throwable exception() @safe @nogc { 43 enforceNGC(isSet, "Future not yet set"); 44 return _exBuf.get(); 45 } 46 47 /// Wait for the future to be set 48 @notrace @nogc 49 void wait(Timeout timeout = Timeout.infinite, string file = __FILE_FULL_PATH__, ulong line = __LINE__) { 50 if (fiberHandle.isSet) { 51 theReactor.joinFiber(fiberHandle, timeout); 52 53 if(!isSet) { 54 // WEKAPP-53185: fiber was killed before even starting 55 _exBuf.construct!FiberKilledWithNoResult(file, line, true); 56 internalRaise(); 57 } 58 } 59 60 if (!isSet) { 61 suspender.wait(timeout); 62 } 63 } 64 65 /// Set the future's state to raised exception 66 @notrace @nogc void raise(Throwable ex) { 67 // we MUST copy the exception here since we may later refer to exceptions that came from defunct fibers 68 // and the fiber itself is GC-allocated, which confuses toGC 69 _exBuf.set(ex); 70 DEBUG!"Future.raise ex=%s _ex=%s"(cast(void*)ex, cast(void*)_exBuf.get); 71 internalRaise(); 72 } 73 74 @notrace @nogc private void internalRaise() { 75 enforceNGC(!isSet, "Future already set"); 76 isSet = true; 77 suspender.signal(); 78 } 79 80 /// Get the result stored in the future 81 /// 82 /// If the future is not yet set, will wait for it to become ready. 83 @notrace @nogc 84 auto ref get(Timeout timeout = Timeout.infinite) { 85 wait(timeout); 86 if (_exBuf.get !is null) { 87 throw setEx(_exBuf.get()); 88 } 89 static if (!is(T == void)) { 90 return _value; 91 } 92 } 93 94 /// Sets the future 95 static if (is(T == void)) { 96 // DDOXBUG this documentation will not be picked due to the static if 97 98 /// Sets the future with no value 99 @notrace void set() @nogc @safe nothrow { 100 ASSERT!"Future already set"(!isSet); 101 DBG_ASSERT!"Future exception already set"(_exBuf.get is null); 102 isSet = true; 103 suspender.signal(); 104 } 105 } 106 else { 107 /// Sets the future with value 108 @notrace void set(const ref T value) @nogc @safe nothrow { 109 ASSERT!"Future already set"(!isSet); 110 DBG_ASSERT!"Future exception already set"(_exBuf.get is null); 111 isSet = true; 112 _value = value; 113 suspender.signal(); 114 } 115 116 /// ditto 117 @notrace void set(T value) @nogc @safe nothrow { 118 ASSERT!"Future already set"(!isSet); 119 DBG_ASSERT!"Future exception already set"(_exBuf.get is null); 120 isSet = true; 121 _value = value; 122 suspender.signal(); 123 } 124 } 125 126 /** 127 * Launch a new fiber that will run the specified callback, set the future when the callback returns 128 * 129 * The first form runs F function with all arguments. 130 * 131 * The second form does the same, but specified a different context to spawn the fiber in. The first argument can 132 * be any object with a `spawnFiber` function. The most obvious example is a `FiberGroup` for the fiber to belong 133 * to. 134 * 135 * The third and fourth forms are for running a supplied delegate instead of an aliased function. 136 */ 137 @notrace auto runInFiber(alias F)(Parameters!F args) { 138 return runInFiber!F(theReactor, args); 139 } 140 141 /// ditto 142 @notrace auto runInFiber(alias F, Runner)(ref Runner runner, Parameters!F args) { 143 alias RetType = ReturnType!F; 144 static void run(FiberPointer!(Future!RetType) pFut, Parameters!F args) { 145 try { 146 static if (is(RetType == void)) { 147 F(args); 148 if (pFut.isValid) { 149 pFut.set(); 150 } 151 } 152 else { 153 auto tmp = F(args); 154 if (pFut.isValid) { 155 pFut.set(tmp); 156 } 157 } 158 } 159 catch (Exception ex) { 160 if (pFut.isValid) { 161 pFut.raise(ex); 162 } 163 else { 164 throw setEx(ex); 165 } 166 } 167 } 168 169 fiberHandle = runner.spawnFiber!run(FiberPointer!(Future!RetType)(&this), args); 170 return fiberHandle; 171 } 172 173 /// ditto 174 @notrace auto runInFiber(T)(T delegate() dlg) { 175 runInFiber(theReactor, dlg); 176 } 177 178 /// ditto 179 @notrace auto runInFiber(T, Runner)(ref Runner runner, T delegate() dlg) { 180 static auto proxyCall(T delegate() dlg) { 181 return dlg(); 182 } 183 184 return runInFiber!proxyCall(runner, dlg); 185 } 186 } 187 188 unittest { 189 testWithReactor({ 190 static void fib1(Future!void* fut) { 191 theReactor.sleep(msecs(20)); 192 fut.set(); 193 } 194 195 static void fib2(Future!int* fut) { 196 theReactor.sleep(msecs(10)); 197 fut.set(88); 198 } 199 200 static void fib3(Future!string* fut) { 201 theReactor.sleep(msecs(15)); 202 fut.raise(new Exception("boom")); 203 } 204 205 Future!void f1; 206 Future!int f2; 207 Future!string f3; 208 209 theReactor.spawnFiber!fib1(&f1); 210 theReactor.spawnFiber!fib2(&f2); 211 theReactor.spawnFiber!fib3(&f3); 212 213 f1.get(); 214 assertEQ(f2.get(), 88); 215 assert(f3.ready, "f3 not ready"); 216 assert(f3.exception, "f3 not exception"); 217 bool failed = false; 218 try { 219 f3.get(); 220 } 221 catch (Exception ex) { 222 assertEQ(ex.msg, "boom"); 223 failed = true; 224 } 225 assert(failed); 226 }); 227 } 228 229 unittest { 230 // WEKAPP-53185 231 static void func() { 232 } 233 234 testWithReactor({ 235 Future!void fut; 236 auto fib = fut.runInFiber!func(); 237 // Kill the fiber before it gets a change to receive the CPU 238 theReactor.throwInFiber!FiberKilled(fib); 239 240 assertThrows!FiberKilledWithNoResult( fut.get( Timeout(12.msecs) ) ); 241 }); 242 }