1 /// Fiber future result execution
2 module mecca.reactor.sync.future;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import std.exception;
7 import std.traits;
8 
9 import mecca.log;
10 import mecca.lib.exception;
11 import mecca.lib.time;
12 import mecca.reactor;
13 import mecca.reactor.utils;
14 import mecca.reactor.sync.event: Signal;
15 
16 /// Custom exception thrown by `Future.get` if the fiber quit without setting a result
17 class FiberKilledWithNoResult : Exception {
18     mixin ExceptionBody!"Future controlled fiber killed without setting a result";
19 }
20 
21 /// Representation for future calculation result
22 @notrace
23 struct Future(T) {
24 private:
25     Signal suspender;
26     FiberHandle fiberHandle;
27     ExcBuf _exBuf;
28     static if (!is(T == void)) {
29         @notrace T _value;
30     }
31     bool isSet = false;
32 
33 public:
34     @disable this(this);
35 
36     /// Returns whether the future has a return value
37     @property bool ready() const pure nothrow @safe @nogc {
38         return isSet;
39     }
40 
41     /// Returns the exception stored in the future's result (if any)
42     @property Throwable exception() @safe @nogc  {
43         enforceNGC(isSet, "Future not yet set");
44         return _exBuf.get();
45     }
46 
47     /// Wait for the future to be set
48     @notrace @nogc
49     void wait(Timeout timeout = Timeout.infinite, string file = __FILE_FULL_PATH__, ulong line = __LINE__) {
50         if (fiberHandle.isSet) {
51             theReactor.joinFiber(fiberHandle, timeout);
52 
53             if(!isSet) {
54                 // WEKAPP-53185: fiber was killed before even starting
55                 _exBuf.construct!FiberKilledWithNoResult(file, line, true);
56                 internalRaise();
57             }
58         }
59 
60         if (!isSet) {
61             suspender.wait(timeout);
62         }
63     }
64 
65     /// Set the future's state to raised exception
66     @notrace @nogc void raise(Throwable ex) {
67         // we MUST copy the exception here since we may later refer to exceptions that came from defunct fibers
68         // and the fiber itself is GC-allocated, which confuses toGC
69         _exBuf.set(ex);
70         DEBUG!"Future.raise ex=%s _ex=%s"(cast(void*)ex, cast(void*)_exBuf.get);
71         internalRaise();
72     }
73 
74     @notrace @nogc private void internalRaise() {
75         enforceNGC(!isSet, "Future already set");
76         isSet = true;
77         suspender.signal();
78     }
79 
80     /// Get the result stored in the future
81     ///
82     /// If the future is not yet set, will wait for it to become ready.
83     @notrace @nogc
84     auto ref get(Timeout timeout = Timeout.infinite) {
85         wait(timeout);
86         if (_exBuf.get !is null) {
87             throw setEx(_exBuf.get());
88         }
89         static if (!is(T == void)) {
90             return _value;
91         }
92     }
93 
94     /// Sets the future
95     static if (is(T == void)) {
96         // DDOXBUG this documentation will not be picked due to the static if
97 
98         /// Sets the future with no value
99         @notrace void set() @nogc @safe nothrow {
100             ASSERT!"Future already set"(!isSet);
101             DBG_ASSERT!"Future exception already set"(_exBuf.get is null);
102             isSet = true;
103             suspender.signal();
104         }
105     }
106     else {
107         /// Sets the future with value
108         @notrace void set(const ref T value) @nogc @safe nothrow {
109             ASSERT!"Future already set"(!isSet);
110             DBG_ASSERT!"Future exception already set"(_exBuf.get is null);
111             isSet = true;
112             _value = value;
113             suspender.signal();
114         }
115 
116         /// ditto
117         @notrace void set(T value) @nogc @safe nothrow {
118             ASSERT!"Future already set"(!isSet);
119             DBG_ASSERT!"Future exception already set"(_exBuf.get is null);
120             isSet = true;
121             _value = value;
122             suspender.signal();
123         }
124     }
125 
126     /**
127      * Launch a new fiber that will run the specified callback, set the future when the callback returns
128      *
129      * The first form runs F function with all arguments.
130      *
131      * The second form does the same, but specified a different context to spawn the fiber in. The first argument can
132      * be any object with a `spawnFiber` function. The most obvious example is a `FiberGroup` for the fiber to belong
133      * to.
134      *
135      * The third and fourth forms are for running a supplied delegate instead of an aliased function.
136      */
137     @notrace auto runInFiber(alias F)(Parameters!F args) {
138         return runInFiber!F(theReactor, args);
139     }
140 
141     /// ditto
142     @notrace auto runInFiber(alias F, Runner)(ref Runner runner, Parameters!F args) {
143         alias RetType = ReturnType!F;
144         static void run(FiberPointer!(Future!RetType) pFut, Parameters!F args) {
145             try {
146                 static if (is(RetType == void)) {
147                     F(args);
148                     if (pFut.isValid) {
149                         pFut.set();
150                     }
151                 }
152                 else {
153                     auto tmp = F(args);
154                     if (pFut.isValid) {
155                         pFut.set(tmp);
156                     }
157                 }
158             }
159             catch (Exception ex) {
160                 if (pFut.isValid) {
161                     pFut.raise(ex);
162                 }
163                 else {
164                     throw setEx(ex);
165                 }
166             }
167         }
168 
169         fiberHandle = runner.spawnFiber!run(FiberPointer!(Future!RetType)(&this), args);
170         return fiberHandle;
171     }
172 
173     /// ditto
174     @notrace auto runInFiber(T)(T delegate() dlg) {
175         runInFiber(theReactor, dlg);
176     }
177 
178     /// ditto
179     @notrace auto runInFiber(T, Runner)(ref Runner runner, T delegate() dlg) {
180         static auto proxyCall(T delegate() dlg) {
181             return dlg();
182         }
183 
184         return runInFiber!proxyCall(runner, dlg);
185     }
186 }
187 
188 unittest {
189     testWithReactor({
190         static void fib1(Future!void* fut) {
191             theReactor.sleep(msecs(20));
192             fut.set();
193         }
194 
195         static void fib2(Future!int* fut) {
196             theReactor.sleep(msecs(10));
197             fut.set(88);
198         }
199 
200         static void fib3(Future!string* fut) {
201             theReactor.sleep(msecs(15));
202             fut.raise(new Exception("boom"));
203         }
204 
205         Future!void f1;
206         Future!int f2;
207         Future!string f3;
208 
209         theReactor.spawnFiber!fib1(&f1);
210         theReactor.spawnFiber!fib2(&f2);
211         theReactor.spawnFiber!fib3(&f3);
212 
213         f1.get();
214         assertEQ(f2.get(), 88);
215         assert(f3.ready, "f3 not ready");
216         assert(f3.exception, "f3 not exception");
217         bool failed = false;
218         try {
219             f3.get();
220         }
221         catch (Exception ex) {
222             assertEQ(ex.msg, "boom");
223             failed = true;
224         }
225         assert(failed);
226     });
227 }
228 
229 unittest {
230     // WEKAPP-53185
231     static void func() {
232     }
233 
234     testWithReactor({
235             Future!void fut;
236             auto fib = fut.runInFiber!func();
237             // Kill the fiber before it gets a change to receive the CPU
238             theReactor.throwInFiber!FiberKilled(fib);
239 
240             assertThrows!FiberKilledWithNoResult( fut.get( Timeout(12.msecs) ) );
241         });
242 }