1 module mecca.reactor.platform.kqueue;
2 
3 version (Kqueue):
4 package:
5 
6 alias Poller = Kqueue;
7 
8 struct Kqueue
9 {
10     public import mecca.reactor.subsystems.poller : FdContext;
11 
12     import core.time : Duration, msecs;
13 
14     import mecca.containers.pools : FixedPool;
15     import mecca.lib.exception : ASSERT, DBG_ASSERT, enforceNGC, errnoEnforceNGC;
16     import mecca.lib.io : FD;
17     import mecca.lib.reflection : setToInit;
18     import mecca.lib.time : Timeout;
19     import mecca.log : INFO, notrace;
20     import mecca.platform.os : OSSignal;
21     import mecca.reactor : theReactor;
22     import mecca.reactor.platform : EVFILT_READ, EVFILT_WRITE, EVFILT_SIGNAL, kevent64, kevent64_s;
23     import mecca.reactor.subsystems.poller : Direction;
24 
25     package(mecca.reactor) alias SignalHandler = void delegate(OSSignal) @system;
26 
27     private
28     {
29         enum MIN_DURATION = 1.msecs;
30         enum NUM_BATCH_EVENTS = 32;
31         enum MAX_CONCURRENT_FDS = 512;
32 
33         FD kqueueFd;
34         FixedPool!(FdContext, MAX_CONCURRENT_FDS) fdPool;
35 
36         int numberOfChanges;
37         kevent64_s[NUM_BATCH_EVENTS] changes;
38         kevent64_s[NUM_BATCH_EVENTS] events;
39     }
40 
41     invariant
42     {
43         assert(changes.length > numberOfChanges);
44     }
45 
46     void open() @safe
47     {
48         enum assertMessage = "Must call theReactor.setup before calling " ~
49             "ReactorFD.openReactor";
50 
51         ASSERT!assertMessage(theReactor.isOpen);
52 
53         const fd = kqueue();
54         errnoEnforceNGC(fd >= 0, "Failed to create kqueue file descriptor");
55         kqueueFd = FD(fd);
56 
57         fdPool.open();
58     }
59 
60     void close()
61     {
62         kqueueFd.close();
63     }
64 
65     @property bool isOpen() const pure nothrow @safe @nogc
66     {
67         return kqueueFd.isValid;
68     }
69 
70     FdContext* registerFD(ref FD fd, bool alreadyNonBlocking = false) @safe @nogc
71     {
72         import core.sys.posix.fcntl : F_SETFL, FD_CLOEXEC, O_NONBLOCK;
73 
74         enum reactorMessage = "registerFD called outside of an open reactor";
75         enum fdMessage = "registerFD called without first calling " ~
76             "ReactorFD.openReactor";
77 
78         ASSERT!reactorMessage(theReactor.isOpen);
79         ASSERT!fdMessage(kqueueFd.isValid);
80 
81         FdContext* ctx = fdPool.alloc();
82         setToInit(ctx);
83         ctx.fdNum = fd.fileNo;
84         scope(failure) fdPool.release(ctx);
85 
86         if (!alreadyNonBlocking)
87         {
88             const res = fcntl(fd.fileNo, F_SETFL, O_NONBLOCK | FD_CLOEXEC);
89             errnoEnforceNGC(res >=0 , "Failed to set fd to non-blocking mode");
90         }
91 
92         internalRegisterFD(fd.fileNo, ctx, Direction.Both);
93 
94         return ctx;
95     }
96 
97     void deregisterFd(ref FD fd, FdContext* ctx, bool fdIsClosing = false) nothrow @safe @nogc
98     {
99         internalDeregisterFD(fd.fileNo, ctx, fdIsClosing);
100 
101         fdPool.release(ctx);
102     }
103 
104     void waitForEvent(FdContext* ctx, int fd, Direction dir, Timeout timeout = Timeout.infinite) @safe @nogc
105     {
106         // XXX Relax this restriction if there is a need
107         DBG_ASSERT!"Cannot wait for both in and out events"(dir != Direction.Both);
108         auto ctxState = &ctx.states[dir];
109         with(FdContext.Type) final switch(ctxState.type) {
110         case None:
111             break;
112         case FiberHandle:
113             ASSERT!"Two fibers cannot wait on the same ReactorFD %s at once: %s asked to wait with %s already waiting"(
114                     false, fd, theReactor.currentFiberHandle.fiberId, ctxState.fibHandle.fiberId );
115             break;
116         case Callback:
117         case CallbackOneShot:
118             ASSERT!"Cannot wait on FD %s already waiting on a callback"(false, fd, dir);
119             break;
120         case SignalHandler:
121             ASSERT!"Cannot wait on signal %s already waiting on a signal handler"(false, fd);
122             break;
123         }
124         ctxState.type = FdContext.Type.FiberHandle;
125         scope(exit) ctxState.type = FdContext.Type.None;
126         ctxState.fibHandle = theReactor.currentFiberHandle;
127 
128         theReactor.suspendCurrentFiber(timeout);
129     }
130 
131     @notrace void registerFdCallback(
132             FdContext* ctx, Direction dir, void delegate(void*) callback, void* opaq, bool oneShot)
133             nothrow @trusted @nogc
134     {
135         DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both);
136         auto state = &ctx.states[dir];
137         INFO!"Registered callback %s on fd %s one shot %s"(&callback, ctx.fdNum, oneShot);
138         ASSERT!"Trying to register callback on busy FD %s: state %s"(
139                 state.type==FdContext.Type.None, ctx.fdNum, state.type );
140         ASSERT!"Cannot register a null callback on FD %s"( callback !is null, ctx.fdNum );
141 
142         state.type = oneShot ? FdContext.Type.CallbackOneShot : FdContext.Type.Callback;
143         state.callback = callback;
144         state.opaq = opaq;
145     }
146 
147     @notrace void unregisterFdCallback(FdContext* ctx, Direction dir) nothrow @trusted @nogc {
148         DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both);
149         auto state = &ctx.states[dir];
150         INFO!"Unregistered callback on fd %s"(ctx.fdNum);
151         ASSERT!"Trying to deregister callback on non-registered FD %s: state %s"(
152                 state.type==FdContext.Type.Callback || state.type==FdContext.Type.CallbackOneShot, ctx.fdNum, state.type);
153 
154         state.type = FdContext.Type.None;
155     }
156 
157     package(mecca.reactor) @notrace FdContext* registerSignalHandler(OSSignal signal, SignalHandler handler)
158         @trusted @nogc
159     {
160         enum reactorMessage = "registerFD called outside of an open reactor";
161         enum fdMessage = "registerFD called without first calling " ~
162             "ReactorFD.openReactor";
163 
164         ASSERT!reactorMessage(theReactor.isOpen);
165         ASSERT!fdMessage(kqueueFd.isValid);
166 
167         auto ctx = fdPool.alloc();
168         setToInit(ctx);
169 
170         auto state = &ctx.states[Direction.Read];
171         state.type = FdContext.Type.SignalHandler;
172         state.signalHandler = handler;
173         ctx.fdNum = signal;
174         scope(failure) fdPool.release(ctx);
175 
176         internalRegisterSignalHandler(ctx);
177         return ctx;
178     }
179 
180     package(mecca.reactor) @notrace void unregisterSignalHandler(FdContext* ctx) nothrow @safe @nogc
181     {
182         internalDeregisterSignalHandler(ctx);
183         fdPool.release(ctx);
184     }
185 
186     /// Export of the poller function
187     ///
188     /// A variation of this function is what's called by the reactor idle callback (unless `OpenOptions.registerDefaultIdler`
189     /// is set to `false`).
190     @notrace void poll()
191     {
192         reactorIdle(Duration.zero);
193     }
194 
195     @notrace bool reactorIdle(Duration timeout)
196     {
197         import core.stdc.errno : EINTR, EPIPE, EBADF, errno;
198 
199         import mecca.lib.time : toTimespec;
200         import mecca.log : DEBUG, WARN;
201         import mecca.reactor.platform : EV_DELETE, EV_ERROR, EV_EOF;
202 
203         static OSSignal toOSSignal(typeof(kevent64_s.ident) signal)
204         in
205         {
206             ASSERT!"Event signal %s could not be converted to OSSignal"(
207                 signal >= OSSignal.min && signal <= OSSignal.max, signal
208             );
209         }
210         do
211         {
212             return cast(OSSignal) signal;
213         }
214 
215         const spec = timeout.toTimespec();
216         const specTimeout = timeout == Duration.max ? null : &spec;
217 
218         const result = kqueueFd.osCall!kevent64(
219             changes.ptr,
220             numberOfChanges,
221             events.ptr,
222             cast(int) events.length,
223             0,
224             specTimeout
225         );
226         numberOfChanges = 0;
227 
228         if (result < 0 && errno == EINTR)
229         {
230             DEBUG!"kevent64 call interrupted by signal";
231             return true;
232         }
233 
234         errnoEnforceNGC(result >= 0, "kevent64 failed");
235 
236         foreach (ref event ; events[0 .. result])
237         {
238             if (event.flags & EV_ERROR)
239             {
240                 switch(event.data)
241                 {
242                     case EPIPE, EBADF:
243                         continue;
244                     default:
245                         errno = cast(int) event.data;
246                         errnoEnforceNGC(false, "event failed");
247                 }
248             }
249 
250             auto ctx = cast(FdContext*) event.udata;
251             ASSERT!"ctx is null"(ctx !is null);
252 
253             with(Direction) foreach(dir; Read..(Write+1)) {
254                 final switch(dir) {
255                 case Read:
256                     if (event.filter != EVFILT_READ && event.filter != EVFILT_SIGNAL)
257                         continue;
258                     break;
259                 case Write:
260                     if (event.filter != EVFILT_WRITE)
261                         continue;
262                     break;
263                 case Both:
264                     assert(false);
265                 }
266 
267                 auto state = &ctx.states[dir];
268                 with(FdContext.Type) final switch(state.type)
269                 {
270                 case None:
271                     if( cast(Direction)dir==Read || event.filter != EVFILT_READ ) {
272                         // Since most FDs are available for write most of the time, almost any wakeup would trigger
273                         // this warning. As such, we log only if one of two conditions are met:
274                         // Either we got this condition on a read, or we got this condition on a write, but the FD is
275                         // not read ready.
276                         WARN!"kqueue64s for returned fd %s events %s which is not listening for %s"(
277                                 ctx.fdNum, event.filter, dir);
278                     }
279                     break;
280                 case FiberHandle:
281                     theReactor.resumeFiber(state.fibHandle);
282                     break;
283                 case Callback:
284                     state.callback(state.opaq);
285                     break;
286                 case CallbackOneShot:
287                     state.type = None;
288                     state.callback(state.opaq);
289                     break;
290                 case SignalHandler:
291                     ASSERT!"Event signal %s was not the same as the registered signal %s"(event.ident == ctx.fdNum, event.ident, ctx.fdNum);
292                     state.signalHandler(toOSSignal(event.ident));
293                     break;
294                 }
295             }
296         }
297 
298         return true;
299     }
300 
301 private:
302 
303     void internalRegisterFD(int fd, FdContext* ctx, Direction dir) @trusted @nogc
304     {
305         import mecca.reactor.platform : EV_ADD, EV_CLEAR, EV_ENABLE;
306 
307         ASSERT!"ctx is null"(ctx !is null);
308         static immutable short[2] filters = [EVFILT_READ, EVFILT_WRITE];
309 
310         foreach (filter ; filters)
311         {
312             const kevent64_s event = {
313                 ident: fd,
314                 filter: filter,
315                 flags: EV_ADD | EV_ENABLE | EV_CLEAR,
316                 udata: cast(ulong) ctx
317             };
318 
319             const result = queueEvent(event);
320             errnoEnforceNGC(result >= 0, "Adding fd to queue failed");
321         }
322     }
323 
324     void internalDeregisterFD(int fd, FdContext* ctx, bool fdIsClosing) nothrow @trusted @nogc
325     {
326         import core.stdc.errno : errno;
327         import mecca.reactor.platform : EV_DELETE;
328 
329         // Events are automatically removed when a file descriptor is closed.
330         // This will save us one system call.
331         if (fdIsClosing)
332             return;
333 
334         ASSERT!"ctx is null"(ctx !is null);
335         static immutable short[2] filters = [EVFILT_READ, EVFILT_WRITE];
336 
337         foreach (filter ; filters)
338         {
339             const kevent64_s event = {
340                 ident: fd,
341                 filter: filter,
342                 flags: EV_DELETE
343             };
344 
345             const result = queueEvent(event);
346             ASSERT!"Removing fd from queue failed with errno %s"(result >= 0, errno);
347         }
348     }
349 
350     void internalRegisterSignalHandler(FdContext* ctx) @trusted @nogc
351     {
352         import mecca.reactor.platform : EV_ADD, EV_CLEAR, EV_ENABLE;
353 
354         ASSERT!"ctx is null"(ctx !is null);
355 
356         const kevent64_s event = {
357             ident: ctx.fdNum,
358             filter: EVFILT_SIGNAL,
359             flags: EV_ADD | EV_ENABLE | EV_CLEAR,
360             udata: cast(ulong) ctx
361         };
362 
363         const result = queueEvent(event, true);
364         errnoEnforceNGC(result >= 0, "Adding signal to queue failed");
365     }
366 
367     void internalDeregisterSignalHandler(FdContext* ctx) nothrow @trusted @nogc
368     {
369         import core.stdc.errno : errno;
370         import mecca.reactor.platform : EV_DELETE;
371 
372         const kevent64_s event = {
373             ident: ctx.fdNum,
374             filter: EVFILT_SIGNAL,
375             flags: EV_DELETE
376         };
377 
378         const result = queueEvent(event, true);
379         ASSERT!"Removing signal from queue failed with errno %s"(result >= 0, errno);
380     }
381 
382     int queueEvent(const ref kevent64_s event, bool flush = false) nothrow @trusted @nogc
383     {
384         changes[numberOfChanges++] = event;
385 
386         if (numberOfChanges != changes.length && !flush)
387             return 1;
388 
389         const result = kqueueFd.osCall!kevent64(changes.ptr, numberOfChanges,
390             null, 0, 0, null);
391         numberOfChanges = 0;
392 
393         return result;
394     }
395 }
396 
397 extern (C) int kqueue() nothrow @trusted @nogc;
398 extern (C) int fcntl(int, int, ...) nothrow @trusted @nogc;