1 module mecca.reactor.platform.linux.poller;
2 
3 // Licensed under the Boost license. Full copyright information in the AUTHORS file
4 
5 version (linux):
6 package(mecca.reactor.platform):
7 
8 import core.stdc.errno;
9 import core.sys.linux.epoll;
10 import core.sys.posix.fcntl;
11 import unistd = core.sys.posix.unistd;
12 import std.conv;
13 import std.exception;
14 import std.meta;
15 import std.traits;
16 import std..string;
17 
18 import mecca.containers.pools;
19 import mecca.lib.io;
20 import mecca.lib.exception;
21 import mecca.lib.reflection;
22 import mecca.lib.time;
23 import mecca.log;
24 import mecca.reactor;
25 import mecca.reactor.subsystems.poller : Direction;
26 
27 //
28 // epoll subsystems (sleep in the kernel until events occur)
29 //
30 
31 // Definitions missing from the phobos headers or lacking nothrow @nogc
32 private extern(C) {
33     int epoll_create1 (int flags) nothrow @trusted @nogc;
34     int fcntl(int, int, ...) nothrow @trusted @nogc;
35     /+
36     int epoll_pwait(int epfd, epoll_event* events,
37                       int maxevents, int timeout,
38                       const sigset_t *sigmask);
39     +/
40 }
41 
42 struct Poller {
43     public import mecca.reactor.subsystems.poller : FdContext;
44 
45 private: // Not that this does anything, as the struct itself is only visible to this file.
46     // Prevent accidental copying
47     @disable this(this);
48 
49     FD epollFd;
50     FixedPool!(FdContext, MAX_CONCURRENT_FDS) fdPool;
51 
52     enum MIN_DURATION = dur!"msecs"(1);
53     enum NUM_BATCH_EVENTS = 32;
54     enum MAX_CONCURRENT_FDS = 512;
55 
56 public:
57 
58     void open() @safe {
59         ASSERT!"Must call theReactor.setup before calling ReactorFD.openReactor"(theReactor.isOpen);
60         int epollFdOs = epoll_create1(EPOLL_CLOEXEC);
61         errnoEnforceNGC( epollFdOs>=0, "Failed to create epoll fd" );
62         epollFd = FD(epollFdOs);
63 
64         fdPool.open();
65     }
66 
67     void close() {
68         epollFd.close();
69     }
70 
71     @property bool isOpen() const pure nothrow @safe @nogc {
72         return epollFd.isValid;
73     }
74 
75     FdContext* registerFD(ref FD fd, bool alreadyNonBlocking = false) @safe @nogc {
76         ASSERT!"registerFD called outside of an open reactor"( theReactor.isOpen );
77         ASSERT!"registerFD called without first calling ReactorFD.openReactor"( epollFd.isValid );
78         FdContext* ctx = fdPool.alloc();
79         setToInit(ctx);
80         ctx.fdNum = fd.fileNo;
81         scope(failure) fdPool.release(ctx);
82 
83         if( !alreadyNonBlocking ) {
84             int res = .fcntl(fd.fileNo, F_SETFL, O_NONBLOCK|FD_CLOEXEC);
85             errnoEnforceNGC( res>=0, "Failed to set fd to non-blocking mode" );
86         }
87 
88         internalRegisterFD(fd.fileNo, ctx, Direction.Both);
89 
90         return ctx;
91     }
92 
93     void deregisterFd(ref FD fd, FdContext* ctx, bool fdIsClosing = false) nothrow @safe @nogc {
94         // fdIsClosing is only used for kqueue
95         internalDeregisterFD(fd.fileNo, ctx);
96 
97         fdPool.release(ctx);
98     }
99 
100     void waitForEvent(FdContext* ctx, int fd, Direction dir, Timeout timeout = Timeout.infinite) @safe @nogc {
101         // XXX Relax this restriction if there is a need
102         DBG_ASSERT!"Cannot wait for both in and out events"(dir != Direction.Both);
103         auto ctxState = &ctx.states[dir];
104         with(FdContext.Type) final switch(ctxState.type) {
105         case None:
106             break;
107         case FiberHandle:
108             ASSERT!(
109                     "Two fibers cannot wait on the same ReactorFD %s direction %s at once: %s asked to wait with %s " ~
110                     "already waiting")
111                     ( false, fd, dir, theReactor.currentFiberHandle.fiberId, ctxState.fibHandle.fiberId );
112             break;
113         case Callback:
114         case CallbackOneShot:
115             ASSERT!"Cannot wait on FD %s direction %s already waiting on a callback"(false, fd, dir);
116             break;
117         case SignalHandler:
118             ASSERT!"Should never happen for epoll, SignalHandler is kqueue only"(false);
119             break;
120         }
121         ctxState.type = FdContext.Type.FiberHandle;
122         scope(exit) ctxState.type = FdContext.Type.None;
123         ctxState.fibHandle = theReactor.currentFiberHandle;
124 
125         theReactor.suspendCurrentFiber(timeout);
126     }
127 
128     @notrace void registerFdCallback(
129             FdContext* ctx, Direction dir, void delegate(void*) callback, void* opaq, bool oneShot)
130             nothrow @trusted @nogc
131     {
132         DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both);
133         auto state = &ctx.states[dir];
134         INFO!"Registered callback %s on fd %s one shot %s"(&callback, ctx.fdNum, oneShot);
135         ASSERT!"Trying to register callback on busy FD %s: state %s"(
136                 state.type==FdContext.Type.None, ctx.fdNum, state.type );
137         ASSERT!"Cannot register a null callback on FD %s"( callback !is null, ctx.fdNum );
138 
139         state.type = oneShot ? FdContext.Type.CallbackOneShot : FdContext.Type.Callback;
140         state.callback = callback;
141         state.opaq = opaq;
142     }
143 
144     @notrace void unregisterFdCallback(FdContext* ctx, Direction dir) nothrow @trusted @nogc {
145         DBG_ASSERT!"Direction may not be Both"(dir!=Direction.Both);
146         auto state = &ctx.states[dir];
147         INFO!"Unregistered callback on fd %s"(ctx.fdNum);
148         ASSERT!"Trying to deregister callback on non-registered FD %s: state %s"(
149                 state.type==FdContext.Type.Callback || state.type==FdContext.Type.CallbackOneShot, ctx.fdNum, state.type);
150 
151         state.type = FdContext.Type.None;
152     }
153 
154     /// Export of the poller function
155     ///
156     /// A variation of this function is what's called by the reactor idle callback (unless `OpenOptions.registerDefaultIdler`
157     /// is set to `false`).
158     @notrace void poll() {
159         reactorIdle(Duration.zero);
160     }
161 
162     @notrace bool reactorIdle(Duration timeout) {
163         int intTimeout;
164         if( timeout == Duration.max )
165             intTimeout = -1;
166         else
167             intTimeout = to!int(timeout.total!"msecs");
168 
169         epoll_event[NUM_BATCH_EVENTS] events;
170         if( timeout > Duration.zero && intTimeout == 0 )
171             intTimeout = 1;
172         int res = epollFd.osCall!epoll_wait(events.ptr, NUM_BATCH_EVENTS, intTimeout);
173 
174         if( res<0 &&  errno==EINTR ) {
175             DEBUG!"epoll call interrupted by signal"();
176             return true;
177         }
178 
179         errnoEnforceNGC( res>=0, "epoll_wait failed" );
180 
181         foreach( ref event; events[0..res] ) {
182             auto ctx = cast(FdContext*)event.data.ptr;
183 
184             with(Direction) foreach(dir; Read..(Write+1)) {
185                 final switch(dir) {
186                 case Read:
187                     if( (event.events & (EPOLLIN | EPOLLHUP))==0 )
188                         continue;
189                     break;
190                 case Write:
191                     if( (event.events & EPOLLOUT)==0 )
192                         continue;
193                     break;
194                 case Both:
195                     assert(false);
196                 }
197 
198                 auto state = &ctx.states[dir];
199                 with(FdContext.Type) final switch(state.type) {
200                 case None:
201                     if( cast(Direction)dir==Read || (event.events & (EPOLLIN | EPOLLHUP))==0 ) {
202                         // Since most FDs are available for write most of the time, almost any wakeup would trigger
203                         // this warning. As such, we log only if one of two conditions are met:
204                         // Either we got this condition on a read, or we got this condition on a write, but the FD is
205                         // not read ready.
206                         WARN!"epoll for returned fd %s events %x which is not listening for %s"(
207                                 ctx.fdNum, event.events, cast(Direction)dir);
208                     }
209                     break;
210                 case FiberHandle:
211                     theReactor.resumeFiber(state.fibHandle);
212                     break;
213                 case Callback:
214                     state.callback(state.opaq);
215                     break;
216                 case CallbackOneShot:
217                     state.type = None;
218                     state.callback(state.opaq);
219                     break;
220                 case SignalHandler:
221                     ASSERT!"Should never happen for epoll, SignalHandler is kqueue only"(false);
222                     break;
223                 }
224             }
225         }
226 
227         return true;
228     }
229 
230 private:
231     @notrace void internalRegisterFD(int fd, FdContext* ctx, Direction dir) @trusted @nogc {
232         epoll_event event = void;
233         event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; // Register with Edge Trigger behavior
234         event.data.ptr = ctx;
235         int res = epollFd.osCall!epoll_ctl(EPOLL_CTL_ADD, fd, &event);
236         errnoEnforceNGC( res>=0, "Adding fd to epoll failed" );
237     }
238 
239     @notrace void internalDeregisterFD(int fd, FdContext* ctx) nothrow @trusted @nogc {
240         int res = epollFd.osCall!epoll_ctl(EPOLL_CTL_DEL, fd, null);
241 
242         // There is no reason for a registered FD to fail removal, so we assert instead of throwing
243         ASSERT!"Removing fd from epoll failed with errno %s"( res>=0, errno );
244     }
245 }
246 
247 // Unit test in mecca.reactor.io