1 module mecca.reactor.platform.kqueue; 2 3 version (Kqueue): 4 package: 5 6 alias Poller = Kqueue; 7 8 struct Kqueue 9 { 10 public import mecca.reactor.subsystems.poller : FdContext; 11 12 import core.time : Duration, msecs; 13 14 import mecca.containers.pools : FixedPool; 15 import mecca.lib.exception : ASSERT, DBG_ASSERT, enforceNGC, errnoEnforceNGC; 16 import mecca.lib.io : FD; 17 import mecca.lib.reflection : setToInit; 18 import mecca.lib.time : Timeout; 19 import mecca.log : INFO, notrace; 20 import mecca.platform.os : OSSignal; 21 import mecca.reactor : theReactor; 22 import mecca.reactor.platform : EVFILT_READ, EVFILT_WRITE, EVFILT_SIGNAL, kevent64, kevent64_s; 23 import mecca.reactor.subsystems.poller : Direction; 24 25 package(mecca.reactor) alias SignalHandler = void delegate(OSSignal) @system; 26 27 private 28 { 29 enum MIN_DURATION = 1.msecs; 30 enum NUM_BATCH_EVENTS = 32; 31 enum MAX_CONCURRENT_FDS = 512; 32 33 FD kqueueFd; 34 FixedPool!(FdContext, MAX_CONCURRENT_FDS) fdPool; 35 36 int numberOfChanges; 37 kevent64_s[NUM_BATCH_EVENTS] changes; 38 kevent64_s[NUM_BATCH_EVENTS] events; 39 } 40 41 invariant 42 { 43 assert(changes.length > numberOfChanges); 44 } 45 46 void open() @safe 47 { 48 enum assertMessage = "Must call theReactor.setup before calling " ~ 49 "ReactorFD.openReactor"; 50 51 ASSERT!assertMessage(theReactor.isOpen); 52 53 const fd = kqueue(); 54 errnoEnforceNGC(fd >= 0, "Failed to create kqueue file descriptor"); 55 kqueueFd = FD(fd); 56 57 fdPool.open(); 58 } 59 60 void close() 61 { 62 kqueueFd.close(); 63 } 64 65 @property bool isOpen() const pure nothrow @safe @nogc 66 { 67 return kqueueFd.isValid; 68 } 69 70 FdContext* registerFD(ref FD fd, bool alreadyNonBlocking = false) @safe @nogc 71 { 72 import core.sys.posix.fcntl : F_SETFL, FD_CLOEXEC, O_NONBLOCK; 73 74 enum reactorMessage = "registerFD called outside of an open reactor"; 75 enum fdMessage = "registerFD called without first calling " ~ 76 "ReactorFD.openReactor"; 77 78 ASSERT!reactorMessage(theReactor.isOpen); 79 ASSERT!fdMessage(kqueueFd.isValid); 80 81 FdContext* ctx = fdPool.alloc(); 82 setToInit(ctx); 83 ctx.fdNum = fd.fileNo; 84 scope(failure) fdPool.release(ctx); 85 86 if (!alreadyNonBlocking) 87 { 88 const res = fcntl(fd.fileNo, F_SETFL, O_NONBLOCK | FD_CLOEXEC); 89 errnoEnforceNGC(res >=0 , "Failed to set fd to non-blocking mode"); 90 } 91 92 internalRegisterFD(fd.fileNo, ctx, Direction.Both); 93 94 return ctx; 95 } 96 97 void deregisterFd(ref FD fd, FdContext* ctx, bool fdIsClosing = false) nothrow @safe @nogc 98 { 99 internalDeregisterFD(fd.fileNo, ctx, fdIsClosing); 100 101 fdPool.release(ctx); 102 } 103 104 void waitForEvent(FdContext* ctx, int fd, Direction dir, Timeout timeout = Timeout.infinite) @safe @nogc 105 { 106 // XXX Relax this restriction if there is a need 107 DBG_ASSERT!"Cannot wait for both in and out events"(dir != Direction.Both); 108 auto ctxState = &ctx.states[dir]; 109 with(FdContext.Type) final switch(ctxState.type) { 110 case None: 111 break; 112 case FiberHandle: 113 ASSERT!"Two fibers cannot wait on the same ReactorFD %s at once: %s asked to wait with %s already waiting"( 114 false, fd, theReactor.currentFiberHandle.fiberId, ctxState.fibHandle.fiberId ); 115 break; 116 case Callback: 117 case CallbackOneShot: 118 ASSERT!"Cannot wait on FD %s already waiting on a callback"(false, fd, dir); 119 break; 120 case SignalHandler: 121 ASSERT!"Cannot wait on signal %s already waiting on a signal handler"(false, fd); 122 break; 123 } 124 ctxState.type = FdContext.Type.FiberHandle; 125 scope(exit) ctxState.type = FdContext.Type.None; 126 ctxState.fibHandle = theReactor.currentFiberHandle; 127 128 theReactor.suspendCurrentFiber(timeout); 129 } 130 131 @notrace void registerFdCallback( 132 FdContext* ctx, Direction dir, void delegate(void*) callback, void* opaq, bool oneShot) 133 nothrow @trusted @nogc 134 { 135 DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both); 136 auto state = &ctx.states[dir]; 137 INFO!"Registered callback %s on fd %s one shot %s"(&callback, ctx.fdNum, oneShot); 138 ASSERT!"Trying to register callback on busy FD %s: state %s"( 139 state.type==FdContext.Type.None, ctx.fdNum, state.type ); 140 ASSERT!"Cannot register a null callback on FD %s"( callback !is null, ctx.fdNum ); 141 142 state.type = oneShot ? FdContext.Type.CallbackOneShot : FdContext.Type.Callback; 143 state.callback = callback; 144 state.opaq = opaq; 145 } 146 147 @notrace void unregisterFdCallback(FdContext* ctx, Direction dir) nothrow @trusted @nogc { 148 DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both); 149 auto state = &ctx.states[dir]; 150 INFO!"Unregistered callback on fd %s"(ctx.fdNum); 151 ASSERT!"Trying to deregister callback on non-registered FD %s: state %s"( 152 state.type==FdContext.Type.Callback || state.type==FdContext.Type.CallbackOneShot, ctx.fdNum, state.type); 153 154 state.type = FdContext.Type.None; 155 } 156 157 package(mecca.reactor) @notrace FdContext* registerSignalHandler(OSSignal signal, SignalHandler handler) 158 @trusted @nogc 159 { 160 enum reactorMessage = "registerFD called outside of an open reactor"; 161 enum fdMessage = "registerFD called without first calling " ~ 162 "ReactorFD.openReactor"; 163 164 ASSERT!reactorMessage(theReactor.isOpen); 165 ASSERT!fdMessage(kqueueFd.isValid); 166 167 auto ctx = fdPool.alloc(); 168 setToInit(ctx); 169 170 auto state = &ctx.states[Direction.Read]; 171 state.type = FdContext.Type.SignalHandler; 172 state.signalHandler = handler; 173 ctx.fdNum = signal; 174 scope(failure) fdPool.release(ctx); 175 176 internalRegisterSignalHandler(ctx); 177 return ctx; 178 } 179 180 package(mecca.reactor) @notrace void unregisterSignalHandler(FdContext* ctx) nothrow @safe @nogc 181 { 182 internalDeregisterSignalHandler(ctx); 183 fdPool.release(ctx); 184 } 185 186 /// Export of the poller function 187 /// 188 /// A variation of this function is what's called by the reactor idle callback (unless `OpenOptions.registerDefaultIdler` 189 /// is set to `false`). 190 @notrace void poll() 191 { 192 reactorIdle(Duration.zero); 193 } 194 195 @notrace bool reactorIdle(Duration timeout) 196 { 197 import core.stdc.errno : EINTR, EPIPE, EBADF, errno; 198 199 import mecca.lib.time : toTimespec; 200 import mecca.log : DEBUG, WARN; 201 import mecca.reactor.platform : EV_DELETE, EV_ERROR, EV_EOF; 202 203 static OSSignal toOSSignal(typeof(kevent64_s.ident) signal) 204 in 205 { 206 ASSERT!"Event signal %s could not be converted to OSSignal"( 207 signal >= OSSignal.min && signal <= OSSignal.max, signal 208 ); 209 } 210 do 211 { 212 return cast(OSSignal) signal; 213 } 214 215 const spec = timeout.toTimespec(); 216 const specTimeout = timeout == Duration.max ? null : &spec; 217 218 const result = kqueueFd.osCall!kevent64( 219 changes.ptr, 220 numberOfChanges, 221 events.ptr, 222 cast(int) events.length, 223 0, 224 specTimeout 225 ); 226 numberOfChanges = 0; 227 228 if (result < 0 && errno == EINTR) 229 { 230 DEBUG!"kevent64 call interrupted by signal"; 231 return true; 232 } 233 234 errnoEnforceNGC(result >= 0, "kevent64 failed"); 235 236 foreach (ref event ; events[0 .. result]) 237 { 238 if (event.flags & EV_ERROR) 239 { 240 switch(event.data) 241 { 242 case EPIPE, EBADF: 243 continue; 244 default: 245 errno = cast(int) event.data; 246 errnoEnforceNGC(false, "event failed"); 247 } 248 } 249 250 auto ctx = cast(FdContext*) event.udata; 251 ASSERT!"ctx is null"(ctx !is null); 252 253 with(Direction) foreach(dir; Read..(Write+1)) { 254 final switch(dir) { 255 case Read: 256 if (event.filter != EVFILT_READ && event.filter != EVFILT_SIGNAL) 257 continue; 258 break; 259 case Write: 260 if (event.filter != EVFILT_WRITE) 261 continue; 262 break; 263 case Both: 264 assert(false); 265 } 266 267 auto state = &ctx.states[dir]; 268 with(FdContext.Type) final switch(state.type) 269 { 270 case None: 271 if( cast(Direction)dir==Read || event.filter != EVFILT_READ ) { 272 // Since most FDs are available for write most of the time, almost any wakeup would trigger 273 // this warning. As such, we log only if one of two conditions are met: 274 // Either we got this condition on a read, or we got this condition on a write, but the FD is 275 // not read ready. 276 WARN!"kqueue64s for returned fd %s events %s which is not listening for %s"( 277 ctx.fdNum, event.filter, dir); 278 } 279 break; 280 case FiberHandle: 281 theReactor.resumeFiber(state.fibHandle); 282 break; 283 case Callback: 284 state.callback(state.opaq); 285 break; 286 case CallbackOneShot: 287 state.type = None; 288 state.callback(state.opaq); 289 break; 290 case SignalHandler: 291 ASSERT!"Event signal %s was not the same as the registered signal %s"(event.ident == ctx.fdNum, event.ident, ctx.fdNum); 292 state.signalHandler(toOSSignal(event.ident)); 293 break; 294 } 295 } 296 } 297 298 return true; 299 } 300 301 private: 302 303 void internalRegisterFD(int fd, FdContext* ctx, Direction dir) @trusted @nogc 304 { 305 import mecca.reactor.platform : EV_ADD, EV_CLEAR, EV_ENABLE; 306 307 ASSERT!"ctx is null"(ctx !is null); 308 static immutable short[2] filters = [EVFILT_READ, EVFILT_WRITE]; 309 310 foreach (filter ; filters) 311 { 312 const kevent64_s event = { 313 ident: fd, 314 filter: filter, 315 flags: EV_ADD | EV_ENABLE | EV_CLEAR, 316 udata: cast(ulong) ctx 317 }; 318 319 const result = queueEvent(event); 320 errnoEnforceNGC(result >= 0, "Adding fd to queue failed"); 321 } 322 } 323 324 void internalDeregisterFD(int fd, FdContext* ctx, bool fdIsClosing) nothrow @trusted @nogc 325 { 326 import core.stdc.errno : errno; 327 import mecca.reactor.platform : EV_DELETE; 328 329 // Events are automatically removed when a file descriptor is closed. 330 // This will save us one system call. 331 if (fdIsClosing) 332 return; 333 334 ASSERT!"ctx is null"(ctx !is null); 335 static immutable short[2] filters = [EVFILT_READ, EVFILT_WRITE]; 336 337 foreach (filter ; filters) 338 { 339 const kevent64_s event = { 340 ident: fd, 341 filter: filter, 342 flags: EV_DELETE 343 }; 344 345 const result = queueEvent(event); 346 ASSERT!"Removing fd from queue failed with errno %s"(result >= 0, errno); 347 } 348 } 349 350 void internalRegisterSignalHandler(FdContext* ctx) @trusted @nogc 351 { 352 import mecca.reactor.platform : EV_ADD, EV_CLEAR, EV_ENABLE; 353 354 ASSERT!"ctx is null"(ctx !is null); 355 356 const kevent64_s event = { 357 ident: ctx.fdNum, 358 filter: EVFILT_SIGNAL, 359 flags: EV_ADD | EV_ENABLE | EV_CLEAR, 360 udata: cast(ulong) ctx 361 }; 362 363 const result = queueEvent(event, true); 364 errnoEnforceNGC(result >= 0, "Adding signal to queue failed"); 365 } 366 367 void internalDeregisterSignalHandler(FdContext* ctx) nothrow @trusted @nogc 368 { 369 import core.stdc.errno : errno; 370 import mecca.reactor.platform : EV_DELETE; 371 372 const kevent64_s event = { 373 ident: ctx.fdNum, 374 filter: EVFILT_SIGNAL, 375 flags: EV_DELETE 376 }; 377 378 const result = queueEvent(event, true); 379 ASSERT!"Removing signal from queue failed with errno %s"(result >= 0, errno); 380 } 381 382 int queueEvent(const ref kevent64_s event, bool flush = false) nothrow @trusted @nogc 383 { 384 changes[numberOfChanges++] = event; 385 386 if (numberOfChanges != changes.length && !flush) 387 return 1; 388 389 const result = kqueueFd.osCall!kevent64(changes.ptr, numberOfChanges, 390 null, 0, 0, null); 391 numberOfChanges = 0; 392 393 return result; 394 } 395 } 396 397 extern (C) int kqueue() nothrow @trusted @nogc; 398 extern (C) int fcntl(int, int, ...) nothrow @trusted @nogc;