1 module mecca.reactor.platform.linux.poller; 2 3 // Licensed under the Boost license. Full copyright information in the AUTHORS file 4 5 version (linux): 6 package(mecca.reactor.platform): 7 8 import core.stdc.errno; 9 import core.sys.linux.epoll; 10 import core.sys.posix.fcntl; 11 import unistd = core.sys.posix.unistd; 12 import std.conv; 13 import std.exception; 14 import std.meta; 15 import std.traits; 16 import std..string; 17 18 import mecca.containers.pools; 19 import mecca.lib.io; 20 import mecca.lib.exception; 21 import mecca.lib.reflection; 22 import mecca.lib.time; 23 import mecca.log; 24 import mecca.reactor; 25 import mecca.reactor.subsystems.poller : Direction; 26 27 // 28 // epoll subsystems (sleep in the kernel until events occur) 29 // 30 31 // Definitions missing from the phobos headers or lacking nothrow @nogc 32 private extern(C) { 33 int epoll_create1 (int flags) nothrow @trusted @nogc; 34 int fcntl(int, int, ...) nothrow @trusted @nogc; 35 /+ 36 int epoll_pwait(int epfd, epoll_event* events, 37 int maxevents, int timeout, 38 const sigset_t *sigmask); 39 +/ 40 } 41 42 struct Poller { 43 public import mecca.reactor.subsystems.poller : FdContext; 44 45 private: // Not that this does anything, as the struct itself is only visible to this file. 46 // Prevent accidental copying 47 @disable this(this); 48 49 FD epollFd; 50 FixedPool!(FdContext, MAX_CONCURRENT_FDS) fdPool; 51 52 enum MIN_DURATION = dur!"msecs"(1); 53 enum NUM_BATCH_EVENTS = 32; 54 enum MAX_CONCURRENT_FDS = 512; 55 56 public: 57 58 void open() @safe { 59 ASSERT!"Must call theReactor.setup before calling ReactorFD.openReactor"(theReactor.isOpen); 60 int epollFdOs = epoll_create1(EPOLL_CLOEXEC); 61 errnoEnforceNGC( epollFdOs>=0, "Failed to create epoll fd" ); 62 epollFd = FD(epollFdOs); 63 64 fdPool.open(); 65 } 66 67 void close() { 68 epollFd.close(); 69 } 70 71 @property bool isOpen() const pure nothrow @safe @nogc { 72 return epollFd.isValid; 73 } 74 75 FdContext* registerFD(ref FD fd, bool alreadyNonBlocking = false) @safe @nogc { 76 ASSERT!"registerFD called outside of an open reactor"( theReactor.isOpen ); 77 ASSERT!"registerFD called without first calling ReactorFD.openReactor"( epollFd.isValid ); 78 FdContext* ctx = fdPool.alloc(); 79 setToInit(ctx); 80 ctx.fdNum = fd.fileNo; 81 scope(failure) fdPool.release(ctx); 82 83 if( !alreadyNonBlocking ) { 84 int res = .fcntl(fd.fileNo, F_SETFL, O_NONBLOCK|FD_CLOEXEC); 85 errnoEnforceNGC( res>=0, "Failed to set fd to non-blocking mode" ); 86 } 87 88 internalRegisterFD(fd.fileNo, ctx, Direction.Both); 89 90 return ctx; 91 } 92 93 void deregisterFd(ref FD fd, FdContext* ctx, bool fdIsClosing = false) nothrow @safe @nogc { 94 // fdIsClosing is only used for kqueue 95 internalDeregisterFD(fd.fileNo, ctx); 96 97 fdPool.release(ctx); 98 } 99 100 void waitForEvent(FdContext* ctx, int fd, Direction dir, Timeout timeout = Timeout.infinite) @safe @nogc { 101 // XXX Relax this restriction if there is a need 102 DBG_ASSERT!"Cannot wait for both in and out events"(dir != Direction.Both); 103 auto ctxState = &ctx.states[dir]; 104 with(FdContext.Type) final switch(ctxState.type) { 105 case None: 106 break; 107 case FiberHandle: 108 ASSERT!( 109 "Two fibers cannot wait on the same ReactorFD %s direction %s at once: %s asked to wait with %s " ~ 110 "already waiting") 111 ( false, fd, dir, theReactor.currentFiberHandle.fiberId, ctxState.fibHandle.fiberId ); 112 break; 113 case Callback: 114 case CallbackOneShot: 115 ASSERT!"Cannot wait on FD %s direction %s already waiting on a callback"(false, fd, dir); 116 break; 117 case SignalHandler: 118 ASSERT!"Should never happen for epoll, SignalHandler is kqueue only"(false); 119 break; 120 } 121 ctxState.type = FdContext.Type.FiberHandle; 122 scope(exit) ctxState.type = FdContext.Type.None; 123 ctxState.fibHandle = theReactor.currentFiberHandle; 124 125 theReactor.suspendCurrentFiber(timeout); 126 } 127 128 @notrace void registerFdCallback( 129 FdContext* ctx, Direction dir, void delegate(void*) callback, void* opaq, bool oneShot) 130 nothrow @trusted @nogc 131 { 132 DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both); 133 auto state = &ctx.states[dir]; 134 INFO!"Registered callback %s on fd %s one shot %s"(&callback, ctx.fdNum, oneShot); 135 ASSERT!"Trying to register callback on busy FD %s: state %s"( 136 state.type==FdContext.Type.None, ctx.fdNum, state.type ); 137 ASSERT!"Cannot register a null callback on FD %s"( callback !is null, ctx.fdNum ); 138 139 state.type = oneShot ? FdContext.Type.CallbackOneShot : FdContext.Type.Callback; 140 state.callback = callback; 141 state.opaq = opaq; 142 } 143 144 @notrace void unregisterFdCallback(FdContext* ctx, Direction dir) nothrow @trusted @nogc { 145 DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both); 146 auto state = &ctx.states[dir]; 147 INFO!"Unregistered callback on fd %s"(ctx.fdNum); 148 ASSERT!"Trying to deregister callback on non-registered FD %s: state %s"( 149 state.type==FdContext.Type.Callback || state.type==FdContext.Type.CallbackOneShot, ctx.fdNum, state.type); 150 151 state.type = FdContext.Type.None; 152 } 153 154 /// Export of the poller function 155 /// 156 /// A variation of this function is what's called by the reactor idle callback (unless `OpenOptions.registerDefaultIdler` 157 /// is set to `false`). 158 @notrace void poll() { 159 reactorIdle(Duration.zero); 160 } 161 162 @notrace bool reactorIdle(Duration timeout) { 163 int intTimeout; 164 if( timeout == Duration.max ) 165 intTimeout = -1; 166 else 167 intTimeout = to!int(timeout.total!"msecs"); 168 169 epoll_event[NUM_BATCH_EVENTS] events; 170 if( timeout > Duration.zero && intTimeout == 0 ) 171 intTimeout = 1; 172 int res = epollFd.osCall!epoll_wait(events.ptr, NUM_BATCH_EVENTS, intTimeout); 173 174 if( res<0 && errno==EINTR ) { 175 DEBUG!"epoll call interrupted by signal"(); 176 return true; 177 } 178 179 errnoEnforceNGC( res>=0, "epoll_wait failed" ); 180 181 foreach( ref event; events[0..res] ) { 182 auto ctx = cast(FdContext*)event.data.ptr; 183 184 with(Direction) foreach(dir; Read..(Write+1)) { 185 final switch(dir) { 186 case Read: 187 if( (event.events & (EPOLLIN | EPOLLHUP))==0 ) 188 continue; 189 break; 190 case Write: 191 if( (event.events & EPOLLOUT)==0 ) 192 continue; 193 break; 194 case Both: 195 assert(false); 196 } 197 198 auto state = &ctx.states[dir]; 199 with(FdContext.Type) final switch(state.type) { 200 case None: 201 if( cast(Direction)dir==Read || (event.events & (EPOLLIN | EPOLLHUP))==0 ) { 202 // Since most FDs are available for write most of the time, almost any wakeup would trigger 203 // this warning. As such, we log only if one of two conditions are met: 204 // Either we got this condition on a read, or we got this condition on a write, but the FD is 205 // not read ready. 206 WARN!"epoll for returned fd %s events %x which is not listening for %s"( 207 ctx.fdNum, event.events, cast(Direction)dir); 208 } 209 break; 210 case FiberHandle: 211 theReactor.resumeFiber(state.fibHandle); 212 break; 213 case Callback: 214 state.callback(state.opaq); 215 break; 216 case CallbackOneShot: 217 state.type = None; 218 state.callback(state.opaq); 219 break; 220 case SignalHandler: 221 ASSERT!"Should never happen for epoll, SignalHandler is kqueue only"(false); 222 break; 223 } 224 } 225 } 226 227 return true; 228 } 229 230 private: 231 @notrace void internalRegisterFD(int fd, FdContext* ctx, Direction dir) @trusted @nogc { 232 epoll_event event = void; 233 event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; // Register with Edge Trigger behavior 234 event.data.ptr = ctx; 235 int res = epollFd.osCall!epoll_ctl(EPOLL_CTL_ADD, fd, &event); 236 errnoEnforceNGC( res>=0, "Adding fd to epoll failed" ); 237 } 238 239 @notrace void internalDeregisterFD(int fd, FdContext* ctx) nothrow @trusted @nogc { 240 int res = epollFd.osCall!epoll_ctl(EPOLL_CTL_DEL, fd, null); 241 242 // There is no reason for a registered FD to fail removal, so we assert instead of throwing 243 ASSERT!"Removing fd from epoll failed with errno %s"( res>=0, errno ); 244 } 245 } 246 247 // Unit test in mecca.reactor.io