1 /// Reactor aware FD (file descriptor) operations
2 module mecca.reactor.io.fd;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import core.stdc.errno;
7 import core.sys.posix.netinet.in_;
8 import core.sys.posix.netinet.tcp;
9 import unistd = core.sys.posix.unistd;
10 import fcntl = core.sys.posix.fcntl;
11 import core.sys.posix.sys.ioctl;
12 import core.sys.posix.sys.socket;
13 import core.sys.posix.sys.types;
14 import std.algorithm;
15 import std.traits;
16 
17 import mecca.lib.exception;
18 import mecca.lib.io;
19 public import mecca.lib.net;
20 import mecca.lib..string;
21 import mecca.lib.time : Timeout, msecs;
22 import mecca.log;
23 import mecca.reactor.subsystems.poller;
24 
25 enum LISTEN_BACKLOG = 10;
26 
27 /**
28   Wrapper for datagram oriented socket (such as UDP)
29  */
30 struct DatagramSocket {
31     Socket sock;
32 
33     alias sock this;
34 
35     /**
36      * Create a datagram socket
37      *
38      * This creates a SOCK_DGRAM (UDP type) socket.
39      *
40      * Params:
41      *  bindAddr = a socket address for the server to connect to.
42      *
43      * Returns:
44      *  Returns the newly created socket.
45      *
46      * Throws:
47      * ErrnoException if the connection fails. Also throws this if one of the system calls fails.
48      */
49     static DatagramSocket create(SockAddr bindAddr) @safe @nogc {
50         return DatagramSocket( Socket.socket(bindAddr.family, SOCK_DGRAM, 0) );
51     }
52 
53     /**
54      * Enable `SOL_BROADCAST` on the socket
55      *
56      * This allows the socket to send datagrams to broadcast addresses.
57      */
58     void enableBroadcast(bool enabled = true) @trusted @nogc {
59         int flag = enabled ? 1 : 0;
60         sock.setSockOpt(SOL_SOCKET, SO_BROADCAST, flag);
61     }
62 
63     unittest {
64         import mecca.reactor;
65 
66         testWithReactor({
67                 auto sock = DatagramSocket.create( SockAddr(SockAddrIPv4.any) );
68                 INFO!"#UT testing successful send to broadcast"();
69                 sock.enableBroadcast(true);
70                 sock.sendTo("Weka.IO", 0, SockAddr(SockAddrIPv4.broadcast(31337)) );
71                 INFO!"#UT testing unsuccessful send to broadcast"();
72                 sock.enableBroadcast(false);
73                 assertThrows!ErrnoException( sock.sendTo("Weka.IO", 0, SockAddrIPv4.broadcast(31337) ) );
74             });
75     }
76 }
77 
78 /**
79  * Wrapper for connection oriented datagram sockets.
80  */
81 struct ConnectedDatagramSocket {
82     Socket sock;
83 
84     alias sock this;
85 
86     /**
87      * Create a stream socket and connect, as client, to the address supplied
88      *
89      * This creates a SOCK_SEQPACKET socket. It connects it to the designated server specified in sa, and waits, through the reactor, for
90      * the connection to be established.
91      *
92      * Params:
93      *  sa = a socket address for the server to connect to.
94      *  timeout = the timeout for the connection. Throws TimeoutExpired if the timeout expires
95      *
96      * Returns:
97      *  Returns the connected socket.
98      *
99      * Throws:
100      * TimeoutExpired if the timeout expires
101      *
102      * ErrnoException if the connection fails (e.g. - ECONNREFUSED if connecting to a non-listening port). Also throws this if one of the
103      *                  system calls fails.
104      *
105      * Anything else: May throw any exception injected using throwInFiber.
106      */
107     static ConnectedDatagramSocket connect(SockAddr sa, Timeout timeout = Timeout.infinite) @safe @nogc {
108         ConnectedDatagramSocket ret = ConnectedDatagramSocket( Socket.socket(sa.family, SOCK_SEQPACKET, 0) );
109 
110         connectHelper(ret, sa, timeout);
111 
112         return ret;
113     }
114 
115     /// ditto
116     static ConnectedDatagramSocket connect(SockAddrUnix sa, Timeout timeout = Timeout.infinite) @safe @nogc {
117         return connect(SockAddr(sa));
118     }
119 
120     /**
121      * Create a datagram stream socket and bind, as a listening server, to the address supplied
122      *
123      * This creates a SOCK_SEQPACKET socket. It binds it to the designated address specified in sa and puts it in listening mode.
124      *
125      * Params:
126      *  sa = a socket address for the server to listen on.
127      *  reuseAddr = Whether to set the `SO_REUSEADDR` socket option
128      *
129      * Returns:
130      *  Returns the listening socket.
131      *
132      * Throws:
133      * ErrnoException if the connection fails (e.g. - EADDRINUSE if binding to a used port). Also throws this if one of the
134      *                  system calls fails.
135      */
136     @notrace static ConnectedDatagramSocket listen(SockAddr sa, bool reuseAddr = false) @trusted @nogc {
137         ConnectedDatagramSocket sock = ConnectedDatagramSocket( Socket.socket(sa.family, SOCK_SEQPACKET, 0) );
138 
139         if( reuseAddr ) {
140             sock.setSockOpt( SOL_SOCKET, SO_REUSEADDR, 1 );
141         }
142 
143         sock.osCallErrno!(.bind)(&sa.base, sa.len);
144         sock.osCallErrno!(.listen)(LISTEN_BACKLOG);
145 
146         return sock;
147     }
148 
149     /// ditto
150     @notrace static ConnectedDatagramSocket listen(SockAddrUnix sa, bool reuseAddr = false) @safe @nogc {
151         return listen( SockAddr(sa), reuseAddr );
152     }
153 
154     /**
155      * draws a new client connection from a listening socket
156      *
157      * This function waits for a client to connect to the socket. Once that happens, it returns with a ConnectedSocket for the new client.
158      *
159      * Params:
160      *  clientAddr = an out parameter that receives the socket address of the client that connected.
161      *  timeout = how long to wait for a new connection
162      *
163      * Returns:
164      *  Returns the connected socket.
165      *
166      * Throws:
167      * ErrnoException if the connection fails $(LPAREN)e.g. - EINVAL if accepting from a non-listening socket, or
168      * ECONNABORTED if a connection was aborted$(RPAREN). Also throws this if one of the system calls fails.
169      *
170      * TimeoutExpired if the timeout expires
171      *
172      * Anything else: May throw any exception injected using throwInFiber.
173      */
174     @notrace ConnectedDatagramSocket accept(out SockAddr clientAddr, Timeout timeout = Timeout.infinite) @trusted @nogc {
175         socklen_t len = SockAddr.sizeof;
176         int clientFd = sock.blockingCall!(.accept)(Direction.Read, &clientAddr.base, &len, timeout);
177 
178         auto clientSock = ConnectedDatagramSocket( Socket( ReactorFD( clientFd ) ) );
179 
180         return clientSock;
181     }
182 
183 
184     /**
185      * send an entire object
186      */
187     @notrace void sendObj(T)(auto ref const(T) data, int flags=MSG_EOR, Timeout timeout = Timeout.infinite) @safe @nogc {
188         sock.sendObj(data, flags, timeout);
189     }
190 }
191 
192 /**
193  * Wrapper for connection oriented sockets.
194  */
195 struct ConnectedSocket {
196     Socket sock;
197 
198     alias sock this;
199 
200     /**
201      * Create a stream socket and connect, as client, to the address supplied
202      *
203      * This creates a TCP socket (or equivalent for the address family). It connects it to the designated server specified in sa, and
204      * waits, through the reactor, for the connection to be established.
205      *
206      * Params:
207      *  sa = a socket address for the server to connect to (either `SockAddr` or `SockAddr*` where * is IPv4, IPv6 or Unix.)
208      *  timeout = the timeout for the connection. Throws TimeoutExpired if the timeout expires
209      *  nodelay = by default, Nagle algorithm is disabled for TCP connections. Setting this parameter to false reverts
210      *    to the system-wide configuration.
211      *
212      * Returns:
213      *  Returns the connected socket.
214      *
215      * Throws:
216      * TimeoutExpired if the timeout expires
217      *
218      * ErrnoException if the connection fails (e.g. - ECONNREFUSED if connecting to a non-listening port). Also throws
219      * this if one of the system calls fails.
220      *
221      * Anything else: May throw any exception injected using throwInFiber.
222      */
223     static ConnectedSocket connect(SockAddr sa, Timeout timeout = Timeout.infinite, bool nodelay = true) @safe @nogc {
224         ConnectedSocket ret = ConnectedSocket( Socket.socket(sa.family, SOCK_STREAM, 0) );
225 
226         connectHelper(ret, sa, timeout);
227 
228         // Nagle is only defined for TCP/IPv*
229         if( (sa.family == AF_INET || sa.family == AF_INET6) && nodelay ) {
230             ret.setNagle(true);
231         }
232 
233         return ret;
234     }
235 
236     /// ditto
237     static ConnectedSocket connect(SA)(SA sa, Timeout timeout = Timeout.infinite, bool nodelay = true) @safe @nogc
238             if( is( typeof( SockAddr(sa) ) == SockAddr ) )
239     {
240         return connect( SockAddr(sa) );
241     }
242 
243     /**
244      * Create a stream socket and bind, as a listening server, to the address supplied
245      *
246      * This creates a TCP socket (or equivalent for the address family). It binds it to the designated address specified in sa, and
247      * puts it in listening mode.
248      *
249      * Params:
250      *  sa = a socket address for the server to listen on. The second form is for passing protocol specific addresses
251      *    (`SockAddrIPv4`, `SockAddrIPv6`, `SockAddrUnix`).
252      *  reuseAddr = Whether to set the `SO_REUSEADDR` socket option
253      *
254      * Returns:
255      *  Returns the listening socket.
256      *
257      * Throws:
258      * ErrnoException if the connection fails (e.g. - EADDRINUSE if binding to a used port). Also throws this if one of the
259      *                  system calls fails.
260      */
261     @notrace static ConnectedSocket listen(SockAddr sa, bool reuseAddr = false) @trusted @nogc {
262         ConnectedSocket sock = ConnectedSocket( Socket.socket(sa.family, SOCK_STREAM, 0) );
263 
264         if( reuseAddr ) {
265             sock.setSockOpt( SOL_SOCKET, SO_REUSEADDR, 1 );
266         }
267 
268         sock.osCallErrno!(.bind)(&sa.base, sa.len);
269         sock.osCallErrno!(.listen)(LISTEN_BACKLOG);
270 
271         return sock;
272     }
273 
274     /// ditto
275     @notrace static ConnectedSocket listen(SA)(SA sa, bool reuseAddr = false) @trusted @nogc
276             if( is( typeof(SockAddr(sa)) == SockAddr ) )
277     {
278         return listen( SockAddr(sa), reuseAddr );
279     }
280 
281     /**
282      * draws a new client connection from a listening socket
283      *
284      * This function waits for a client to connect to the socket. Once that happens, it returns with a ConnectedSocket for the new client.
285      *
286      * Params:
287      *  clientAddr = an out parameter that receives the socket address of the client that connected.
288      *  nodelay = by default, Nagle algorithm is disabled for TCP connections. Setting this parameter to false reverts
289      *         to the system-wide configuration.
290      *  timeout = how long to wait for an incoming connection
291      *
292      * Returns:
293      *  Returns the connected socket.
294      *
295      * Throws:
296      * ErrnoException if the connection fails (e.g. - EINVAL if accepting from a non-listening socket, or ECONNABORTED
297      * if a connection was aborted). Also throws this if one of the system calls fails.
298      *
299      * TimeoutExpired if the timeout expires
300      *
301      * Anything else: May throw any exception injected using throwInFiber.
302      */
303     @notrace ConnectedSocket accept(out SockAddr clientAddr, bool nodelay = true, Timeout timeout = Timeout.infinite)
304             @trusted @nogc
305     {
306         socklen_t len = SockAddr.sizeof;
307         int clientFd = sock.blockingCall!(.accept)(Direction.Read, &clientAddr.base, &len, timeout);
308 
309         auto clientSock = ConnectedSocket( Socket( ReactorFD( clientFd ) ) );
310         if( nodelay && (clientAddr.family == AF_INET || clientAddr.family == AF_INET6) )
311             clientSock.setNagle(true);
312 
313         return clientSock;
314     }
315 
316     /**
317      * Enables or disables Nagle on a TCP socket
318      *
319      * Nagle is a packet aggregation algorithm employed over TCP. When enabled, under certain conditions, data sent gets delayed, hoping to
320      * combine it with future data into less packets. The problem is that for request/response type protocols (such as HTTP), this algorithm
321      * might result in increased latency.
322      *
323      * This function allows selectively enabling/disabling Nagle on TCP sockets.
324      */
325     void setNagle(bool on) @trusted @nogc {
326         sock.setSockOpt( IPPROTO_TCP, TCP_NODELAY, cast(int)on );
327     }
328 }
329 
330 private void connectHelper(ref Socket sock, SockAddr sa, Timeout timeout) @trusted @nogc {
331     int result = sock.osCall!(.connect)(&sa.base, sa.len);
332 
333     while(result!=0 && errno==EINPROGRESS) {
334         // Wait for connect to finish
335         poller.waitForEvent(sock.ctx, sock.get.fileNo, Direction.Write, timeout);
336 
337         socklen_t reslen = result.sizeof;
338 
339         sock.osCallErrnoMsg!(.getsockopt)( SOL_SOCKET, SO_ERROR, &result, &reslen, "Fetching connect error status failed");
340     }
341 
342     if( result!=0 ) {
343         errno = result;
344         errnoEnforceNGC(false, "connect");
345     }
346 }
347 
348 /**
349  * Base class for the different types of sockets
350  */
351 struct Socket {
352     ReactorFD fd;
353 
354     alias fd this;
355 
356     /**
357      * send data over a connected socket
358      */
359     @notrace ssize_t send(const void[] data, int flags=0, Timeout timeout = Timeout.infinite) @trusted @nogc {
360         return fd.blockingCall!(.send)(Direction.Write, data.ptr, data.length, flags, timeout);
361     }
362 
363     /// ditto
364     @notrace ssize_t send(const void[] data, Timeout timeout) @trusted @nogc {
365         return send(data, 0, timeout);
366     }
367 
368     /**
369      * send an entire object over a connected socket
370      */
371     @notrace void sendObj(T)(auto ref const(T) data, int flags=0, Timeout timeout = Timeout.infinite) @trusted @nogc {
372         objectCall!send(&data, flags, timeout);
373     }
374 
375     /// ditto
376     @notrace void sendObj(T)(auto ref const(T) data, Timeout timeout) @safe @nogc {
377         sendObj(data, 0, timeout);
378     }
379 
380     /**
381      * send data over an unconnected socket
382      *
383      * The second form allows passing SockAddr* types directly
384      */
385     ssize_t sendTo(const void[] data, int flags, SockAddr destAddr, Timeout timeout = Timeout.infinite)
386             @trusted @nogc
387     {
388         return fd.blockingCall!(.sendto)(
389                 Direction.Write, data.ptr, data.length, flags, &destAddr.base, destAddr.len, timeout);
390     }
391 
392     /// ditto
393     ssize_t sendTo(SA)(const void[] data, int flags, auto ref SA destAddr, Timeout timeout = Timeout.infinite)
394             @trusted @nogc
395             if( !is( SA==SockAddr ) && is( typeof(SockAddr(destAddr)) == SockAddr ) )
396     {
397         return sendTo( data, flags, SockAddr(destAddr), timeout );
398     }
399 
400     /**
401      * send an entire object over an unconnected socket
402      */
403     @notrace void sendObjTo(T)(
404             auto ref const(T) data, ref const(SockAddr) dst, int flags=0, Timeout timeout = Timeout.infinite)
405             @trusted @nogc
406     {
407         objectCall!sendTo(&data, flags, dst, timeout);
408     }
409 
410     /// ditto
411     @notrace void sendObjTo(T)(auto ref const(T) data, ref const(SockAddr) dst, Timeout timeout) @safe @nogc {
412         sendObjTo(data, dst, 0, timeout);
413     }
414 
415     /**
416      * Implementation of sendmsg.
417      */
418     ssize_t sendmsg(const ref msghdr msg, int flags, Timeout timeout = Timeout.infinite) @trusted @nogc {
419         return fd.blockingCall!(.sendmsg)(Direction.Write, &msg, flags, timeout);
420     }
421 
422     /**
423      * recv data from a connected socket
424      *
425      * Can be used on unconnected sockets as well, but then it is not possible to know who the sender was.
426      *
427      * Params:
428      * buffer = the buffer range to send
429      * flags = flags argument as defined for the standard socket recv
430      * timeout = how long to wait for data
431      *
432      * Returns:
433      * The number of bytes actually received
434      *
435      * Throws:
436      * May throw an ErrnoException in case of error
437      *
438      * Will throw TimeoutExpired if the timeout expired
439      */
440     @notrace ssize_t recv(void[] buffer, int flags, Timeout timeout = Timeout.infinite) @trusted @nogc {
441         return fd.blockingCall!(.recv)(Direction.Read, buffer.ptr, buffer.length, flags, timeout);
442     }
443 
444     /**
445      * recv whole object from a connected socket
446      *
447      * Can be used on unconnected sockets as well, but then it is not possible to know who the sender was. This form of
448      * the call is intended for recieving object of absolute know size.
449      *
450      * Params:
451      * data = pointer to data to be received.
452      * flags = flags ardument as defined for the standard socket recv
453      * timeout = timeout
454      *
455      * Throws:
456      * May throw an `ErrnoException` in case of a socket error. If amount of bytes received is not identical to
457      * `sizeof(T)`, will throw `ShortRead` excetpion, which inherits from `ErrnoException` with `errno` set to
458      * `EREMOTEIO` (remote IO error).
459      *
460      * Will throw TimeoutExpired if the timeout expired
461      */
462     @notrace void recvObj(T)(T* data, int flags=0, Timeout timeout = Timeout.infinite) @safe @nogc
463             if(!hasElaborateDestructor!T)
464     {
465         objectCall!recv(data, flags, timeout);
466     }
467 
468     /// ditto
469     @notrace void recvObj(T)(T* data, Timeout timeout) @safe @nogc {
470         recvObj(data, 0, timeout);
471     }
472 
473     /**
474      * recv data from an unconnected socket
475      */
476     ssize_t recvFrom(void[] buffer, int flags, out SockAddr srcAddr, Timeout timeout = Timeout.infinite) @trusted @nogc
477     {
478         socklen_t addrLen = SockAddr.sizeof;
479         return fd.blockingCall!(.recvfrom)(Direction.Read, buffer.ptr, buffer.length, flags, &srcAddr.base, &addrLen, timeout);
480     }
481 
482     /// ditto
483     ssize_t recvFrom(void[] buffer, out SockAddr srcAddr, Timeout timeout = Timeout.infinite) @safe @nogc
484     {
485         return recvFrom( buffer, 0, srcAddr, timeout );
486     }
487 
488     /// Implement the recvmsg system call in a reactor friendly way.
489     ssize_t recvmsg(ref msghdr msg, int flags, Timeout timeout = Timeout.infinite ) @trusted @nogc {
490         return fd.blockingCall!(.recvmsg)(Direction.Read, &msg, flags, timeout);
491     }
492 
493     /**
494      * Get the local address of the socket.
495      */
496     SockAddr getLocalAddress() @trusted @nogc {
497         SockAddr sa;
498         socklen_t saLen = sa.sizeof;
499         fd.osCallErrno!(.getsockname)(&sa.base, &saLen);
500 
501         return sa;
502     }
503 
504     /**
505      * Get the remote address of a connected socket.
506      */
507     SockAddr getPeerAddress() @trusted @nogc {
508         SockAddr sa;
509         socklen_t saLen = sa.sizeof;
510         fd.osCallErrno!(.getpeername)(&sa.base, &saLen);
511 
512         return sa;
513     }
514 
515     /**
516      * get the name of the socket's peer (for connected sockets only)
517      *
518      * Throws: ErrnoException (ENOTCONN) if called on an unconnected socket.
519      */
520     SockAddr getPeerName() @trusted @nogc {
521         SockAddr sa;
522         socklen_t saLen = sa.sizeof;
523         fd.osCallErrno!(.getpeername)(&sa.base, &saLen);
524 
525         return sa;
526     }
527 
528     /**
529      * Call the `setsockopt` on the socket
530      *
531      * Throws ErrnoException on failure
532      */
533     void setSockOpt(int level, int optname, const(void)[] optval, string msg="setsockopt failed") @nogc {
534         fd.osCallErrnoMsg!(.setsockopt)( level, optname, optval.ptr, cast(socklen_t)optval.length, msg );
535     }
536 
537     /// ditto
538     void setSockOpt(T)(int level, int optname, auto ref const(T) optval, string msg="setsockopt failed") @nogc {
539         const(T)[] optvalRange = (&optval)[0..1];
540         setSockOpt(level, optname, optvalRange, msg);
541     }
542 
543     /**
544      * Call the `getsockopt` on the socket
545      *
546      * Throws ErrnoException on failure
547      */
548     T[] getSockOpt(T)(int level, int optname, T[] optval) @nogc {
549         void[] option = optval;
550         socklen_t len = cast(socklen_t)option.length;
551         fd.osCallErrno!(.getsockopt)( level, optname, option.ptr, &len );
552 
553         return cast(T[]) option[0..len];
554     }
555 
556     /// ditto
557     void getSockOpt(T)(int level, int optname, ref T optval) @nogc if(! isArray!T) {
558         T[] optvalRange = (&optval)[0..1];
559         getSockOpt(level, optname, optvalRange);
560     }
561 
562 private:
563     @notrace static Socket socket(sa_family_t domain, int type, int protocol) @trusted @nogc {
564         int fd = .socket(domain, type, protocol);
565         errnoEnforceNGC( fd>=0, "socket creation failed" );
566 
567         return Socket( ReactorFD(fd) );
568     }
569 
570     @notrace void objectCall(alias F, T)(T* object, Parameters!F[1..$] args) @trusted @nogc {
571         auto size = F(object[0..1], args);
572         if( size!=T.sizeof ) {
573             if( size==0 ) {
574                 errno = ECONNRESET; // Other side closed. Inject "Connection reset by peer"
575                 throw mkExFmt!ErrnoException("%s(%s)", __traits(identifier, F), fd.get().fileNo);
576             } else {
577                 throw mkExFmt!ShortRead("%s(%s)", __traits(identifier, F), fd.get().fileNo);
578             }
579         }
580     }
581 }
582 
583 unittest {
584     import mecca.reactor;
585     import mecca.reactor.sync.event;
586 
587     theReactor.setup();
588     scope(success) theReactor.teardown();
589 
590     enum BUF_SIZE = 128;
591     enum NUM_BUFFERS = 16384;
592     SockAddr sa;
593     Event evt;
594 
595     void server() {
596         ConnectedSocket sock = ConnectedSocket.listen( SockAddr(SockAddrIPv4.any()) );
597         sa = sock.getLocalAddress();
598         INFO!"Listening socket on %s"(sa.toString()); // TODO remove reliance on GC
599         evt.set();
600 
601         SockAddr clientAddr;
602         ConnectedSocket clientSock = sock.accept(clientAddr);
603 
604         char[BUF_SIZE] buffer;
605 
606         uint totalSize;
607         while(true) {
608             auto size = clientSock.read(buffer);
609             if(size == 0) break;
610             assertEQ( buffer[0], cast(ubyte)(totalSize / BUF_SIZE), "Got incorrect value from socket" );
611             totalSize += size;
612         }
613 
614         assertEQ( totalSize, NUM_BUFFERS * BUF_SIZE, "Did not get the expected number of bytes" );
615 
616         theReactor.stop();
617     }
618 
619     void client() {
620         evt.wait();
621         INFO!"Connecting to %s"(sa.toString()); // TODO remove GC
622         SockAddr serverAddr = SockAddrIPv4.loopback(sa.ipv4.port);
623         ConnectedSocket sock = ConnectedSocket.connect( serverAddr );
624 
625         char[BUF_SIZE] buffer;
626         foreach( uint i; 0..NUM_BUFFERS ) {
627             buffer[] = cast(ubyte)i;
628             sock.write( buffer );
629         }
630     }
631 
632     theReactor.spawnFiber(&server);
633     theReactor.spawnFiber(&client);
634 
635     theReactor.start();
636 }
637 
638 /// Reactor aware FD wrapper for files
639 struct File {
640     ReactorFD fd;
641 
642     alias fd this;
643 
644     /**
645      * Open a named file.
646      *
647      * Parameters are as defined for the open system call. `flags` must not have `O_CREAT` set (use the other overload for that case).
648      */
649     void open(string pathname, int flags) @trusted @nogc {
650         DBG_ASSERT!"open called with O_CREAT but no file mode argument. Flags %x"( (flags & fcntl.O_CREAT)==0, flags );
651         open(pathname, flags, 0);
652     }
653 
654     /**
655      * Open or create a named file.
656      *
657      * Parameters are as defined for the open system call.
658      */
659     void open(string pathname, int flags, mode_t mode) @trusted @nogc {
660         ASSERT!"open called on already open file."(!fd.isValid);
661 
662         int osFd = fcntl.open(toStringzNGC(pathname), flags, mode);
663         errnoEnforceNGC( osFd>=0, "Failed to open file" );
664 
665         fd = ReactorFD(osFd);
666     }
667 }
668 
669 /**
670  * An FD capable of performing sleeping operations through the reactor, when necessary
671  */
672 struct ReactorFD {
673 private:
674     FD fd;
675     Poller.FdContext* ctx;
676 
677 public:
678     @disable this(this);
679 
680     /**
681      * Constructor from existing mecca.lib.FD
682      *
683      * Params:
684      * fd = bare OS fd. Ownership is handed to the ReactorFD.
685      * alreadyNonBlocking = whether the OS fd has NONBLOCKING already set on it. Setting to true saves a call to fcntl, but will hang the
686      *             reactor in some cases.
687      */
688     this(int fd, bool alreadyNonBlocking = false) @safe @nogc {
689         this( FD(fd), alreadyNonBlocking );
690     }
691 
692     /**
693      * Constructor from existing mecca.lib.FD
694      *
695      * Params:
696      * fd = an FD rvalue
697      * alreadyNonBlocking = whether the OS fd has NONBLOCKING already set on it. Setting to true saves a call to fcntl, but will hang the
698      *             reactor in some cases.
699      */
700     this(FD fd, bool alreadyNonBlocking = false) @safe @nogc {
701         move( fd, this.fd );
702         ctx = poller.registerFD(this.fd, alreadyNonBlocking);
703     }
704 
705     ~this() nothrow @safe @nogc {
706         close();
707     }
708 
709     /// Move semantics opAssign
710     ref ReactorFD opAssign(ReactorFD rhs) return nothrow @safe @nogc {
711         swap( rhs.fd, fd );
712         swap( rhs.ctx, ctx );
713 
714         return this;
715     }
716 
717     /// Cleanly closes an FD
718     void close() nothrow @safe @nogc {
719         if( fd.isValid ) {
720             DBG_ASSERT!"%s Asked to close fd %s with null context"(ctx !is null, &this, fd.fileNo);
721 
722             poller.deregisterFd( fd, ctx, true );
723 
724             fd.close();
725             ctx = null;
726         }
727     }
728 
729     /// Tests for open descriptor
730     @property bool isValid() const pure nothrow @safe @nogc {
731         return fd.isValid;
732     }
733 
734     /// Returns the underlying mecca.lib.io.FD
735     @property ref FD get() return nothrow @safe @nogc {
736         return fd;
737     }
738 
739     /// Perform reactor aware @safe read
740     @notrace ssize_t read(void[] buffer, Timeout timeout = Timeout.infinite) @trusted @nogc {
741         return blockingCall!(unistd.read)( Direction.Read, buffer.ptr, buffer.length, timeout );
742     }
743 
744     /// ditto
745     @notrace ssize_t read(T)(T* ptr, Timeout timeout = Timeout.infinite) @trusted @nogc {
746         return read(ptr[0..1], timeout);
747     }
748 
749     /// Perform reactor aware @safe write
750     @notrace ssize_t write(const void[] buffer, Timeout timeout = Timeout.infinite) @trusted @nogc {
751         return blockingCall!(unistd.write)( Direction.Write, buffer.ptr, buffer.length, timeout );
752     }
753 
754     /// ditto
755     @notrace ssize_t write(T)(const(T)* buffer, Timeout timeout = Timeout.infinite) @trusted @nogc {
756         return write(buffer[0..1], timeout);
757     }
758 
759     alias fcntl = osCallErrno!(.fcntl.fcntl);
760     alias ioctl = osCallErrno!(.ioctl);
761 
762     /** Take an FD out of the control of the reactor
763      *
764      * This has the same effect as close, except the fd itself remains open.
765      *
766      * Returns:
767      * The FD (rvalue) controlling the underlying OS fd.
768      */
769     FD passivify() @safe @nogc {
770         if( !fd.isValid )
771             return FD();
772 
773         poller.deregisterFd( fd, ctx );
774         ctx = null;
775 
776         return move(fd);
777     }
778 
779     /**
780      * Register a user callback to be called if the FD is "active"
781      *
782      * $(B Warning): Using this function without understanding the underlying mechanism might cause the callback to not
783      * get called. Use with caution!
784      *
785      * Registers a callback to be called the next time an event is available on the file descriptor. The semantics of
786      * when the callback will be called follow the same rules as epoll's edge trigger mode. This means that if the
787      * last operation performed with the FD did not block, the callback will not be called.
788      *
789      * One way to make sure this doesn't happen is to call a blocking function with a timeout of zero.
790      *
791      * Use `unregisterCallback` to unregister the callback.
792      *
793      * Params:
794      * dir = direction (`Read`/`Write`) to register. The `Direction.Both` is illegal here.
795      * dlg = the delegate to be called
796      * opaq = a value that will be passed, as is, to the delegate.
797      * oneShot = if set to `true`, the callback will automatically be deregistered after being called once.
798      */
799     void registerCallback(Direction dir, void delegate(void*) dlg, void* opaq, bool oneShot = true) nothrow @safe @nogc {
800         poller.registerFdCallback(ctx, dir, dlg, opaq, oneShot);
801     }
802 
803     void unregisterCallback(Direction dir) nothrow @safe @nogc {
804         poller.unregisterFdCallback(ctx, dir);
805     }
806 
807 package(mecca.reactor):
808     auto blockingCall(alias F)(Direction dir, Parameters!F[1 .. $] args, Timeout timeout) @system @nogc {
809         static assert (is(Parameters!F[0] == int));
810         static assert (isSigned!(ReturnType!F));
811 
812         while (true) {
813             auto ret = fd.osCall!F(args);
814             if (ret < 0) {
815                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
816                     poller.waitForEvent(ctx, fd.fileNo, dir, timeout);
817                 }
818                 else {
819                     throw mkExFmt!ErrnoException("%s(%s)", __traits(identifier, F), fd.fileNo);
820                 }
821             }
822             else {
823                 return ret;
824             }
825         }
826     }
827 
828     auto osCall(alias F)(Parameters!F[1..$] args) nothrow @system @nogc {
829         return fd.osCall!F(args);
830     }
831 
832     auto osCallErrno(alias F)(Parameters!F[1..$] args) @system @nogc if(isSigned!(ReturnType!F) && isIntegral!(ReturnType!F)) {
833         enum FuncFullName = fullyQualifiedName!F;
834 
835         import std..string : lastIndexOf;
836         enum FuncName = FuncFullName[ lastIndexOf(FuncFullName, '.')+1 .. $ ];
837 
838         enum ErrorMessage = "Running " ~ FuncName ~ " failed";
839         return osCallErrnoMsg!F(args, ErrorMessage);
840     }
841 
842     auto osCallErrnoMsg(alias F)(Parameters!F[1..$] args, string msg) @system @nogc
843             if(isSigned!(ReturnType!F) && isIntegral!(ReturnType!F))
844     {
845         alias RetType = ReturnType!F;
846         RetType ret = fd.checkedCall!F(args, msg);
847 
848         return ret;
849     }
850 }
851 
852 void _openReactorEpoll() {
853     poller.open();
854 }
855 
856 void _closeReactorEpoll() {
857     poller.close();
858 }
859 
860 version(unittest):
861 private class UnitTest {
862     import core.sys.posix.sys.types;
863 
864     import mecca.lib.consts;
865     import mecca.reactor;
866     import mecca.runtime.ut;
867 
868     void testBody() {
869         FD pipeReadFD, pipeWriteFD;
870         createPipe(pipeReadFD, pipeWriteFD);
871         ReactorFD pipeRead = ReactorFD(move(pipeReadFD));
872         ReactorFD pipeWrite = ReactorFD(move(pipeWriteFD));
873 
874         void reader() {
875             uint[1024] buffer;
876             enum BUFF_SIZE = typeof(buffer).sizeof;
877             uint lastNum = -1;
878 
879             // Send 2MB over the pipe
880             ssize_t res;
881             while((res = pipeRead.read(buffer))>0) {
882                 assert(res==BUFF_SIZE, "Short read from pipe");
883                 assert(buffer[0] == ++lastNum, "Read incorrect value from buffer");
884             }
885 
886             errnoEnforceNGC(res==0, "Read failed from pipe");
887             INFO!"Reader finished"();
888             theReactor.stop();
889         }
890 
891         void writer() {
892             uint[1024] buffer;
893             enum BUFF_SIZE = typeof(buffer).sizeof;
894 
895             // Send 2MB over the pipe
896             while(buffer[0] < (2*MB/BUFF_SIZE)) {
897                 ssize_t res = pipeWrite.write(buffer);
898                 errnoEnforceNGC( res>=0, "Write failed on pipe");
899                 assert( res==BUFF_SIZE, "Short write to pipe" );
900                 buffer[0]++;
901             }
902 
903             INFO!"Writer finished - closing pipe"();
904             pipeWrite.close();
905         }
906 
907         theReactor.spawnFiber(&reader);
908         theReactor.spawnFiber(&writer);
909 
910         theReactor.start();
911     }
912 
913     @mecca_ut:
914     void normalPoller() {
915         theReactor.setup();
916         scope(success) theReactor.teardown();
917 
918         testBody();
919     }
920 
921     void timedPoller() {
922         Reactor.OpenOptions options;
923 
924         options.registerDefaultIdler = false;
925         theReactor.setup(options);
926         scope(success) theReactor.teardown();
927 
928         theReactor.registerRecurringTimer(1.msecs, &poller.poll);
929 
930         testBody();
931     }
932 
933     mixin TEST_FIXTURE!UnitTest;
934 }