1 /// Reactor aware FD (file descriptor) operations 2 module mecca.reactor.io.fd; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import core.stdc.errno; 7 import core.sys.posix.netinet.in_; 8 import core.sys.posix.netinet.tcp; 9 import unistd = core.sys.posix.unistd; 10 import fcntl = core.sys.posix.fcntl; 11 import core.sys.posix.sys.ioctl; 12 import core.sys.posix.sys.socket; 13 import core.sys.posix.sys.types; 14 import std.algorithm; 15 import std.traits; 16 17 import mecca.lib.exception; 18 import mecca.lib.io; 19 public import mecca.lib.net; 20 import mecca.lib..string; 21 import mecca.lib.time : Timeout, msecs; 22 import mecca.log; 23 import mecca.reactor.subsystems.poller; 24 25 enum LISTEN_BACKLOG = 10; 26 27 /** 28 Wrapper for datagram oriented socket (such as UDP) 29 */ 30 struct DatagramSocket { 31 Socket sock; 32 33 alias sock this; 34 35 /** 36 * Create a datagram socket 37 * 38 * This creates a SOCK_DGRAM (UDP type) socket. 39 * 40 * Params: 41 * bindAddr = a socket address for the server to connect to. 42 * 43 * Returns: 44 * Returns the newly created socket. 45 * 46 * Throws: 47 * ErrnoException if the connection fails. Also throws this if one of the system calls fails. 48 */ 49 static DatagramSocket create(SockAddr bindAddr) @safe @nogc { 50 return DatagramSocket( Socket.socket(bindAddr.family, SOCK_DGRAM, 0) ); 51 } 52 53 /** 54 * Enable `SOL_BROADCAST` on the socket 55 * 56 * This allows the socket to send datagrams to broadcast addresses. 57 */ 58 void enableBroadcast(bool enabled = true) @trusted @nogc { 59 int flag = enabled ? 1 : 0; 60 sock.setSockOpt(SOL_SOCKET, SO_BROADCAST, flag); 61 } 62 63 unittest { 64 import mecca.reactor; 65 66 testWithReactor({ 67 auto sock = DatagramSocket.create( SockAddr(SockAddrIPv4.any) ); 68 INFO!"#UT testing successful send to broadcast"(); 69 sock.enableBroadcast(true); 70 sock.sendTo("Weka.IO", 0, SockAddr(SockAddrIPv4.broadcast(31337)) ); 71 INFO!"#UT testing unsuccessful send to broadcast"(); 72 sock.enableBroadcast(false); 73 assertThrows!ErrnoException( sock.sendTo("Weka.IO", 0, SockAddrIPv4.broadcast(31337) ) ); 74 }); 75 } 76 } 77 78 /** 79 * Wrapper for connection oriented datagram sockets. 80 */ 81 struct ConnectedDatagramSocket { 82 Socket sock; 83 84 alias sock this; 85 86 /** 87 * Create a stream socket and connect, as client, to the address supplied 88 * 89 * This creates a SOCK_SEQPACKET socket. It connects it to the designated server specified in sa, and waits, through the reactor, for 90 * the connection to be established. 91 * 92 * Params: 93 * sa = a socket address for the server to connect to. 94 * timeout = the timeout for the connection. Throws TimeoutExpired if the timeout expires 95 * 96 * Returns: 97 * Returns the connected socket. 98 * 99 * Throws: 100 * TimeoutExpired if the timeout expires 101 * 102 * ErrnoException if the connection fails (e.g. - ECONNREFUSED if connecting to a non-listening port). Also throws this if one of the 103 * system calls fails. 104 * 105 * Anything else: May throw any exception injected using throwInFiber. 106 */ 107 static ConnectedDatagramSocket connect(SockAddr sa, Timeout timeout = Timeout.infinite) @safe @nogc { 108 ConnectedDatagramSocket ret = ConnectedDatagramSocket( Socket.socket(sa.family, SOCK_SEQPACKET, 0) ); 109 110 connectHelper(ret, sa, timeout); 111 112 return ret; 113 } 114 115 /// ditto 116 static ConnectedDatagramSocket connect(SockAddrUnix sa, Timeout timeout = Timeout.infinite) @safe @nogc { 117 return connect(SockAddr(sa)); 118 } 119 120 /** 121 * Create a datagram stream socket and bind, as a listening server, to the address supplied 122 * 123 * This creates a SOCK_SEQPACKET socket. It binds it to the designated address specified in sa and puts it in listening mode. 124 * 125 * Params: 126 * sa = a socket address for the server to listen on. 127 * reuseAddr = Whether to set the `SO_REUSEADDR` socket option 128 * 129 * Returns: 130 * Returns the listening socket. 131 * 132 * Throws: 133 * ErrnoException if the connection fails (e.g. - EADDRINUSE if binding to a used port). Also throws this if one of the 134 * system calls fails. 135 */ 136 @notrace static ConnectedDatagramSocket listen(SockAddr sa, bool reuseAddr = false) @trusted @nogc { 137 ConnectedDatagramSocket sock = ConnectedDatagramSocket( Socket.socket(sa.family, SOCK_SEQPACKET, 0) ); 138 139 if( reuseAddr ) { 140 sock.setSockOpt( SOL_SOCKET, SO_REUSEADDR, 1 ); 141 } 142 143 sock.osCallErrno!(.bind)(&sa.base, sa.len); 144 sock.osCallErrno!(.listen)(LISTEN_BACKLOG); 145 146 return sock; 147 } 148 149 /// ditto 150 @notrace static ConnectedDatagramSocket listen(SockAddrUnix sa, bool reuseAddr = false) @safe @nogc { 151 return listen( SockAddr(sa), reuseAddr ); 152 } 153 154 /** 155 * draws a new client connection from a listening socket 156 * 157 * This function waits for a client to connect to the socket. Once that happens, it returns with a ConnectedSocket for the new client. 158 * 159 * Params: 160 * clientAddr = an out parameter that receives the socket address of the client that connected. 161 * timeout = how long to wait for a new connection 162 * 163 * Returns: 164 * Returns the connected socket. 165 * 166 * Throws: 167 * ErrnoException if the connection fails $(LPAREN)e.g. - EINVAL if accepting from a non-listening socket, or 168 * ECONNABORTED if a connection was aborted$(RPAREN). Also throws this if one of the system calls fails. 169 * 170 * TimeoutExpired if the timeout expires 171 * 172 * Anything else: May throw any exception injected using throwInFiber. 173 */ 174 @notrace ConnectedDatagramSocket accept(out SockAddr clientAddr, Timeout timeout = Timeout.infinite) @trusted @nogc { 175 socklen_t len = SockAddr.sizeof; 176 int clientFd = sock.blockingCall!(.accept)(Direction.Read, &clientAddr.base, &len, timeout); 177 178 auto clientSock = ConnectedDatagramSocket( Socket( ReactorFD( clientFd ) ) ); 179 180 return clientSock; 181 } 182 183 184 /** 185 * send an entire object 186 */ 187 @notrace void sendObj(T)(auto ref const(T) data, int flags=MSG_EOR, Timeout timeout = Timeout.infinite) @safe @nogc { 188 sock.sendObj(data, flags, timeout); 189 } 190 } 191 192 /** 193 * Wrapper for connection oriented sockets. 194 */ 195 struct ConnectedSocket { 196 Socket sock; 197 198 alias sock this; 199 200 /** 201 * Create a stream socket and connect, as client, to the address supplied 202 * 203 * This creates a TCP socket (or equivalent for the address family). It connects it to the designated server specified in sa, and 204 * waits, through the reactor, for the connection to be established. 205 * 206 * Params: 207 * sa = a socket address for the server to connect to (either `SockAddr` or `SockAddr*` where * is IPv4, IPv6 or Unix.) 208 * timeout = the timeout for the connection. Throws TimeoutExpired if the timeout expires 209 * nodelay = by default, Nagle algorithm is disabled for TCP connections. Setting this parameter to false reverts 210 * to the system-wide configuration. 211 * 212 * Returns: 213 * Returns the connected socket. 214 * 215 * Throws: 216 * TimeoutExpired if the timeout expires 217 * 218 * ErrnoException if the connection fails (e.g. - ECONNREFUSED if connecting to a non-listening port). Also throws 219 * this if one of the system calls fails. 220 * 221 * Anything else: May throw any exception injected using throwInFiber. 222 */ 223 static ConnectedSocket connect(SockAddr sa, Timeout timeout = Timeout.infinite, bool nodelay = true) @safe @nogc { 224 ConnectedSocket ret = ConnectedSocket( Socket.socket(sa.family, SOCK_STREAM, 0) ); 225 226 connectHelper(ret, sa, timeout); 227 228 // Nagle is only defined for TCP/IPv* 229 if( (sa.family == AF_INET || sa.family == AF_INET6) && nodelay ) { 230 ret.setNagle(true); 231 } 232 233 return ret; 234 } 235 236 /// ditto 237 static ConnectedSocket connect(SA)(SA sa, Timeout timeout = Timeout.infinite, bool nodelay = true) @safe @nogc 238 if( is( typeof( SockAddr(sa) ) == SockAddr ) ) 239 { 240 return connect( SockAddr(sa) ); 241 } 242 243 /** 244 * Create a stream socket and bind, as a listening server, to the address supplied 245 * 246 * This creates a TCP socket (or equivalent for the address family). It binds it to the designated address specified in sa, and 247 * puts it in listening mode. 248 * 249 * Params: 250 * sa = a socket address for the server to listen on. The second form is for passing protocol specific addresses 251 * (`SockAddrIPv4`, `SockAddrIPv6`, `SockAddrUnix`). 252 * reuseAddr = Whether to set the `SO_REUSEADDR` socket option 253 * 254 * Returns: 255 * Returns the listening socket. 256 * 257 * Throws: 258 * ErrnoException if the connection fails (e.g. - EADDRINUSE if binding to a used port). Also throws this if one of the 259 * system calls fails. 260 */ 261 @notrace static ConnectedSocket listen(SockAddr sa, bool reuseAddr = false) @trusted @nogc { 262 ConnectedSocket sock = ConnectedSocket( Socket.socket(sa.family, SOCK_STREAM, 0) ); 263 264 if( reuseAddr ) { 265 sock.setSockOpt( SOL_SOCKET, SO_REUSEADDR, 1 ); 266 } 267 268 sock.osCallErrno!(.bind)(&sa.base, sa.len); 269 sock.osCallErrno!(.listen)(LISTEN_BACKLOG); 270 271 return sock; 272 } 273 274 /// ditto 275 @notrace static ConnectedSocket listen(SA)(SA sa, bool reuseAddr = false) @trusted @nogc 276 if( is( typeof(SockAddr(sa)) == SockAddr ) ) 277 { 278 return listen( SockAddr(sa), reuseAddr ); 279 } 280 281 /** 282 * draws a new client connection from a listening socket 283 * 284 * This function waits for a client to connect to the socket. Once that happens, it returns with a ConnectedSocket for the new client. 285 * 286 * Params: 287 * clientAddr = an out parameter that receives the socket address of the client that connected. 288 * nodelay = by default, Nagle algorithm is disabled for TCP connections. Setting this parameter to false reverts 289 * to the system-wide configuration. 290 * timeout = how long to wait for an incoming connection 291 * 292 * Returns: 293 * Returns the connected socket. 294 * 295 * Throws: 296 * ErrnoException if the connection fails (e.g. - EINVAL if accepting from a non-listening socket, or ECONNABORTED 297 * if a connection was aborted). Also throws this if one of the system calls fails. 298 * 299 * TimeoutExpired if the timeout expires 300 * 301 * Anything else: May throw any exception injected using throwInFiber. 302 */ 303 @notrace ConnectedSocket accept(out SockAddr clientAddr, bool nodelay = true, Timeout timeout = Timeout.infinite) 304 @trusted @nogc 305 { 306 socklen_t len = SockAddr.sizeof; 307 int clientFd = sock.blockingCall!(.accept)(Direction.Read, &clientAddr.base, &len, timeout); 308 309 auto clientSock = ConnectedSocket( Socket( ReactorFD( clientFd ) ) ); 310 if( nodelay && (clientAddr.family == AF_INET || clientAddr.family == AF_INET6) ) 311 clientSock.setNagle(true); 312 313 return clientSock; 314 } 315 316 /** 317 * Enables or disables Nagle on a TCP socket 318 * 319 * Nagle is a packet aggregation algorithm employed over TCP. When enabled, under certain conditions, data sent gets delayed, hoping to 320 * combine it with future data into less packets. The problem is that for request/response type protocols (such as HTTP), this algorithm 321 * might result in increased latency. 322 * 323 * This function allows selectively enabling/disabling Nagle on TCP sockets. 324 */ 325 void setNagle(bool on) @trusted @nogc { 326 sock.setSockOpt( IPPROTO_TCP, TCP_NODELAY, cast(int)on ); 327 } 328 } 329 330 private void connectHelper(ref Socket sock, SockAddr sa, Timeout timeout) @trusted @nogc { 331 int result = sock.osCall!(.connect)(&sa.base, sa.len); 332 333 while(result!=0 && errno==EINPROGRESS) { 334 // Wait for connect to finish 335 poller.waitForEvent(sock.ctx, sock.get.fileNo, Direction.Write, timeout); 336 337 socklen_t reslen = result.sizeof; 338 339 sock.osCallErrnoMsg!(.getsockopt)( SOL_SOCKET, SO_ERROR, &result, &reslen, "Fetching connect error status failed"); 340 } 341 342 if( result!=0 ) { 343 errno = result; 344 errnoEnforceNGC(false, "connect"); 345 } 346 } 347 348 /** 349 * Base class for the different types of sockets 350 */ 351 struct Socket { 352 ReactorFD fd; 353 354 alias fd this; 355 356 /** 357 * send data over a connected socket 358 */ 359 @notrace ssize_t send(const void[] data, int flags=0, Timeout timeout = Timeout.infinite) @trusted @nogc { 360 return fd.blockingCall!(.send)(Direction.Write, data.ptr, data.length, flags, timeout); 361 } 362 363 /// ditto 364 @notrace ssize_t send(const void[] data, Timeout timeout) @trusted @nogc { 365 return send(data, 0, timeout); 366 } 367 368 /** 369 * send an entire object over a connected socket 370 */ 371 @notrace void sendObj(T)(auto ref const(T) data, int flags=0, Timeout timeout = Timeout.infinite) @trusted @nogc { 372 objectCall!send(&data, flags, timeout); 373 } 374 375 /// ditto 376 @notrace void sendObj(T)(auto ref const(T) data, Timeout timeout) @safe @nogc { 377 sendObj(data, 0, timeout); 378 } 379 380 /** 381 * send data over an unconnected socket 382 * 383 * The second form allows passing SockAddr* types directly 384 */ 385 ssize_t sendTo(const void[] data, int flags, SockAddr destAddr, Timeout timeout = Timeout.infinite) 386 @trusted @nogc 387 { 388 return fd.blockingCall!(.sendto)( 389 Direction.Write, data.ptr, data.length, flags, &destAddr.base, destAddr.len, timeout); 390 } 391 392 /// ditto 393 ssize_t sendTo(SA)(const void[] data, int flags, auto ref SA destAddr, Timeout timeout = Timeout.infinite) 394 @trusted @nogc 395 if( !is( SA==SockAddr ) && is( typeof(SockAddr(destAddr)) == SockAddr ) ) 396 { 397 return sendTo( data, flags, SockAddr(destAddr), timeout ); 398 } 399 400 /** 401 * send an entire object over an unconnected socket 402 */ 403 @notrace void sendObjTo(T)( 404 auto ref const(T) data, ref const(SockAddr) dst, int flags=0, Timeout timeout = Timeout.infinite) 405 @trusted @nogc 406 { 407 objectCall!sendTo(&data, flags, dst, timeout); 408 } 409 410 /// ditto 411 @notrace void sendObjTo(T)(auto ref const(T) data, ref const(SockAddr) dst, Timeout timeout) @safe @nogc { 412 sendObjTo(data, dst, 0, timeout); 413 } 414 415 /** 416 * Implementation of sendmsg. 417 */ 418 ssize_t sendmsg(const ref msghdr msg, int flags, Timeout timeout = Timeout.infinite) @trusted @nogc { 419 return fd.blockingCall!(.sendmsg)(Direction.Write, &msg, flags, timeout); 420 } 421 422 /** 423 * recv data from a connected socket 424 * 425 * Can be used on unconnected sockets as well, but then it is not possible to know who the sender was. 426 * 427 * Params: 428 * buffer = the buffer range to send 429 * flags = flags argument as defined for the standard socket recv 430 * timeout = how long to wait for data 431 * 432 * Returns: 433 * The number of bytes actually received 434 * 435 * Throws: 436 * May throw an ErrnoException in case of error 437 * 438 * Will throw TimeoutExpired if the timeout expired 439 */ 440 @notrace ssize_t recv(void[] buffer, int flags, Timeout timeout = Timeout.infinite) @trusted @nogc { 441 return fd.blockingCall!(.recv)(Direction.Read, buffer.ptr, buffer.length, flags, timeout); 442 } 443 444 /** 445 * recv whole object from a connected socket 446 * 447 * Can be used on unconnected sockets as well, but then it is not possible to know who the sender was. This form of 448 * the call is intended for recieving object of absolute know size. 449 * 450 * Params: 451 * data = pointer to data to be received. 452 * flags = flags ardument as defined for the standard socket recv 453 * timeout = timeout 454 * 455 * Throws: 456 * May throw an `ErrnoException` in case of a socket error. If amount of bytes received is not identical to 457 * `sizeof(T)`, will throw `ShortRead` excetpion, which inherits from `ErrnoException` with `errno` set to 458 * `EREMOTEIO` (remote IO error). 459 * 460 * Will throw TimeoutExpired if the timeout expired 461 */ 462 @notrace void recvObj(T)(T* data, int flags=0, Timeout timeout = Timeout.infinite) @safe @nogc 463 if(!hasElaborateDestructor!T) 464 { 465 objectCall!recv(data, flags, timeout); 466 } 467 468 /// ditto 469 @notrace void recvObj(T)(T* data, Timeout timeout) @safe @nogc { 470 recvObj(data, 0, timeout); 471 } 472 473 /** 474 * recv data from an unconnected socket 475 */ 476 ssize_t recvFrom(void[] buffer, int flags, out SockAddr srcAddr, Timeout timeout = Timeout.infinite) @trusted @nogc 477 { 478 socklen_t addrLen = SockAddr.sizeof; 479 return fd.blockingCall!(.recvfrom)(Direction.Read, buffer.ptr, buffer.length, flags, &srcAddr.base, &addrLen, timeout); 480 } 481 482 /// ditto 483 ssize_t recvFrom(void[] buffer, out SockAddr srcAddr, Timeout timeout = Timeout.infinite) @safe @nogc 484 { 485 return recvFrom( buffer, 0, srcAddr, timeout ); 486 } 487 488 /// Implement the recvmsg system call in a reactor friendly way. 489 ssize_t recvmsg(ref msghdr msg, int flags, Timeout timeout = Timeout.infinite ) @trusted @nogc { 490 return fd.blockingCall!(.recvmsg)(Direction.Read, &msg, flags, timeout); 491 } 492 493 /** 494 * Get the local address of the socket. 495 */ 496 SockAddr getLocalAddress() @trusted @nogc { 497 SockAddr sa; 498 socklen_t saLen = sa.sizeof; 499 fd.osCallErrno!(.getsockname)(&sa.base, &saLen); 500 501 return sa; 502 } 503 504 /** 505 * Get the remote address of a connected socket. 506 */ 507 SockAddr getPeerAddress() @trusted @nogc { 508 SockAddr sa; 509 socklen_t saLen = sa.sizeof; 510 fd.osCallErrno!(.getpeername)(&sa.base, &saLen); 511 512 return sa; 513 } 514 515 /** 516 * get the name of the socket's peer (for connected sockets only) 517 * 518 * Throws: ErrnoException (ENOTCONN) if called on an unconnected socket. 519 */ 520 SockAddr getPeerName() @trusted @nogc { 521 SockAddr sa; 522 socklen_t saLen = sa.sizeof; 523 fd.osCallErrno!(.getpeername)(&sa.base, &saLen); 524 525 return sa; 526 } 527 528 /** 529 * Call the `setsockopt` on the socket 530 * 531 * Throws ErrnoException on failure 532 */ 533 void setSockOpt(int level, int optname, const(void)[] optval, string msg="setsockopt failed") @nogc { 534 fd.osCallErrnoMsg!(.setsockopt)( level, optname, optval.ptr, cast(socklen_t)optval.length, msg ); 535 } 536 537 /// ditto 538 void setSockOpt(T)(int level, int optname, auto ref const(T) optval, string msg="setsockopt failed") @nogc { 539 const(T)[] optvalRange = (&optval)[0..1]; 540 setSockOpt(level, optname, optvalRange, msg); 541 } 542 543 /** 544 * Call the `getsockopt` on the socket 545 * 546 * Throws ErrnoException on failure 547 */ 548 T[] getSockOpt(T)(int level, int optname, T[] optval) @nogc { 549 void[] option = optval; 550 socklen_t len = cast(socklen_t)option.length; 551 fd.osCallErrno!(.getsockopt)( level, optname, option.ptr, &len ); 552 553 return cast(T[]) option[0..len]; 554 } 555 556 /// ditto 557 void getSockOpt(T)(int level, int optname, ref T optval) @nogc if(! isArray!T) { 558 T[] optvalRange = (&optval)[0..1]; 559 getSockOpt(level, optname, optvalRange); 560 } 561 562 private: 563 @notrace static Socket socket(sa_family_t domain, int type, int protocol) @trusted @nogc { 564 int fd = .socket(domain, type, protocol); 565 errnoEnforceNGC( fd>=0, "socket creation failed" ); 566 567 return Socket( ReactorFD(fd) ); 568 } 569 570 @notrace void objectCall(alias F, T)(T* object, Parameters!F[1..$] args) @trusted @nogc { 571 auto size = F(object[0..1], args); 572 if( size!=T.sizeof ) { 573 if( size==0 ) { 574 errno = ECONNRESET; // Other side closed. Inject "Connection reset by peer" 575 throw mkExFmt!ErrnoException("%s(%s)", __traits(identifier, F), fd.get().fileNo); 576 } else { 577 throw mkExFmt!ShortRead("%s(%s)", __traits(identifier, F), fd.get().fileNo); 578 } 579 } 580 } 581 } 582 583 unittest { 584 import mecca.reactor; 585 import mecca.reactor.sync.event; 586 587 theReactor.setup(); 588 scope(success) theReactor.teardown(); 589 590 enum BUF_SIZE = 128; 591 enum NUM_BUFFERS = 16384; 592 SockAddr sa; 593 Event evt; 594 595 void server() { 596 ConnectedSocket sock = ConnectedSocket.listen( SockAddr(SockAddrIPv4.any()) ); 597 sa = sock.getLocalAddress(); 598 INFO!"Listening socket on %s"(sa.toString()); // TODO remove reliance on GC 599 evt.set(); 600 601 SockAddr clientAddr; 602 ConnectedSocket clientSock = sock.accept(clientAddr); 603 604 char[BUF_SIZE] buffer; 605 606 uint totalSize; 607 while(true) { 608 auto size = clientSock.read(buffer); 609 if(size == 0) break; 610 assertEQ( buffer[0], cast(ubyte)(totalSize / BUF_SIZE), "Got incorrect value from socket" ); 611 totalSize += size; 612 } 613 614 assertEQ( totalSize, NUM_BUFFERS * BUF_SIZE, "Did not get the expected number of bytes" ); 615 616 theReactor.stop(); 617 } 618 619 void client() { 620 evt.wait(); 621 INFO!"Connecting to %s"(sa.toString()); // TODO remove GC 622 SockAddr serverAddr = SockAddrIPv4.loopback(sa.ipv4.port); 623 ConnectedSocket sock = ConnectedSocket.connect( serverAddr ); 624 625 char[BUF_SIZE] buffer; 626 foreach( uint i; 0..NUM_BUFFERS ) { 627 buffer[] = cast(ubyte)i; 628 sock.write( buffer ); 629 } 630 } 631 632 theReactor.spawnFiber(&server); 633 theReactor.spawnFiber(&client); 634 635 theReactor.start(); 636 } 637 638 /// Reactor aware FD wrapper for files 639 struct File { 640 ReactorFD fd; 641 642 alias fd this; 643 644 /** 645 * Open a named file. 646 * 647 * Parameters are as defined for the open system call. `flags` must not have `O_CREAT` set (use the other overload for that case). 648 */ 649 void open(string pathname, int flags) @trusted @nogc { 650 DBG_ASSERT!"open called with O_CREAT but no file mode argument. Flags %x"( (flags & fcntl.O_CREAT)==0, flags ); 651 open(pathname, flags, 0); 652 } 653 654 /** 655 * Open or create a named file. 656 * 657 * Parameters are as defined for the open system call. 658 */ 659 void open(string pathname, int flags, mode_t mode) @trusted @nogc { 660 ASSERT!"open called on already open file."(!fd.isValid); 661 662 int osFd = fcntl.open(toStringzNGC(pathname), flags, mode); 663 errnoEnforceNGC( osFd>=0, "Failed to open file" ); 664 665 fd = ReactorFD(osFd); 666 } 667 } 668 669 /** 670 * An FD capable of performing sleeping operations through the reactor, when necessary 671 */ 672 struct ReactorFD { 673 private: 674 FD fd; 675 Poller.FdContext* ctx; 676 677 public: 678 @disable this(this); 679 680 /** 681 * Constructor from existing mecca.lib.FD 682 * 683 * Params: 684 * fd = bare OS fd. Ownership is handed to the ReactorFD. 685 * alreadyNonBlocking = whether the OS fd has NONBLOCKING already set on it. Setting to true saves a call to fcntl, but will hang the 686 * reactor in some cases. 687 */ 688 this(int fd, bool alreadyNonBlocking = false) @safe @nogc { 689 this( FD(fd), alreadyNonBlocking ); 690 } 691 692 /** 693 * Constructor from existing mecca.lib.FD 694 * 695 * Params: 696 * fd = an FD rvalue 697 * alreadyNonBlocking = whether the OS fd has NONBLOCKING already set on it. Setting to true saves a call to fcntl, but will hang the 698 * reactor in some cases. 699 */ 700 this(FD fd, bool alreadyNonBlocking = false) @safe @nogc { 701 move( fd, this.fd ); 702 ctx = poller.registerFD(this.fd, alreadyNonBlocking); 703 } 704 705 ~this() nothrow @safe @nogc { 706 close(); 707 } 708 709 /// Move semantics opAssign 710 ref ReactorFD opAssign(ReactorFD rhs) return nothrow @safe @nogc { 711 swap( rhs.fd, fd ); 712 swap( rhs.ctx, ctx ); 713 714 return this; 715 } 716 717 /// Cleanly closes an FD 718 void close() nothrow @safe @nogc { 719 if( fd.isValid ) { 720 DBG_ASSERT!"%s Asked to close fd %s with null context"(ctx !is null, &this, fd.fileNo); 721 722 poller.deregisterFd( fd, ctx, true ); 723 724 fd.close(); 725 ctx = null; 726 } 727 } 728 729 /// Tests for open descriptor 730 @property bool isValid() const pure nothrow @safe @nogc { 731 return fd.isValid; 732 } 733 734 /// Returns the underlying mecca.lib.io.FD 735 @property ref FD get() return nothrow @safe @nogc { 736 return fd; 737 } 738 739 /// Perform reactor aware @safe read 740 @notrace ssize_t read(void[] buffer, Timeout timeout = Timeout.infinite) @trusted @nogc { 741 return blockingCall!(unistd.read)( Direction.Read, buffer.ptr, buffer.length, timeout ); 742 } 743 744 /// ditto 745 @notrace ssize_t read(T)(T* ptr, Timeout timeout = Timeout.infinite) @trusted @nogc { 746 return read(ptr[0..1], timeout); 747 } 748 749 /// Perform reactor aware @safe write 750 @notrace ssize_t write(const void[] buffer, Timeout timeout = Timeout.infinite) @trusted @nogc { 751 return blockingCall!(unistd.write)( Direction.Write, buffer.ptr, buffer.length, timeout ); 752 } 753 754 /// ditto 755 @notrace ssize_t write(T)(const(T)* buffer, Timeout timeout = Timeout.infinite) @trusted @nogc { 756 return write(buffer[0..1], timeout); 757 } 758 759 alias fcntl = osCallErrno!(.fcntl.fcntl); 760 alias ioctl = osCallErrno!(.ioctl); 761 762 /** Take an FD out of the control of the reactor 763 * 764 * This has the same effect as close, except the fd itself remains open. 765 * 766 * Returns: 767 * The FD (rvalue) controlling the underlying OS fd. 768 */ 769 FD passivify() @safe @nogc { 770 if( !fd.isValid ) 771 return FD(); 772 773 poller.deregisterFd( fd, ctx ); 774 ctx = null; 775 776 return move(fd); 777 } 778 779 /** 780 * Register a user callback to be called if the FD is "active" 781 * 782 * $(B Warning): Using this function without understanding the underlying mechanism might cause the callback to not 783 * get called. Use with caution! 784 * 785 * Registers a callback to be called the next time an event is available on the file descriptor. The semantics of 786 * when the callback will be called follow the same rules as epoll's edge trigger mode. This means that if the 787 * last operation performed with the FD did not block, the callback will not be called. 788 * 789 * One way to make sure this doesn't happen is to call a blocking function with a timeout of zero. 790 * 791 * Use `unregisterCallback` to unregister the callback. 792 * 793 * Params: 794 * dir = direction (`Read`/`Write`) to register. The `Direction.Both` is illegal here. 795 * dlg = the delegate to be called 796 * opaq = a value that will be passed, as is, to the delegate. 797 * oneShot = if set to `true`, the callback will automatically be deregistered after being called once. 798 */ 799 void registerCallback(Direction dir, void delegate(void*) dlg, void* opaq, bool oneShot = true) nothrow @safe @nogc { 800 poller.registerFdCallback(ctx, dir, dlg, opaq, oneShot); 801 } 802 803 void unregisterCallback(Direction dir) nothrow @safe @nogc { 804 poller.unregisterFdCallback(ctx, dir); 805 } 806 807 package(mecca.reactor): 808 auto blockingCall(alias F)(Direction dir, Parameters!F[1 .. $] args, Timeout timeout) @system @nogc { 809 static assert (is(Parameters!F[0] == int)); 810 static assert (isSigned!(ReturnType!F)); 811 812 while (true) { 813 auto ret = fd.osCall!F(args); 814 if (ret < 0) { 815 if (errno == EAGAIN || errno == EWOULDBLOCK) { 816 poller.waitForEvent(ctx, fd.fileNo, dir, timeout); 817 } 818 else { 819 throw mkExFmt!ErrnoException("%s(%s)", __traits(identifier, F), fd.fileNo); 820 } 821 } 822 else { 823 return ret; 824 } 825 } 826 } 827 828 auto osCall(alias F)(Parameters!F[1..$] args) nothrow @system @nogc { 829 return fd.osCall!F(args); 830 } 831 832 auto osCallErrno(alias F)(Parameters!F[1..$] args) @system @nogc if(isSigned!(ReturnType!F) && isIntegral!(ReturnType!F)) { 833 enum FuncFullName = fullyQualifiedName!F; 834 835 import std..string : lastIndexOf; 836 enum FuncName = FuncFullName[ lastIndexOf(FuncFullName, '.')+1 .. $ ]; 837 838 enum ErrorMessage = "Running " ~ FuncName ~ " failed"; 839 return osCallErrnoMsg!F(args, ErrorMessage); 840 } 841 842 auto osCallErrnoMsg(alias F)(Parameters!F[1..$] args, string msg) @system @nogc 843 if(isSigned!(ReturnType!F) && isIntegral!(ReturnType!F)) 844 { 845 alias RetType = ReturnType!F; 846 RetType ret = fd.checkedCall!F(args, msg); 847 848 return ret; 849 } 850 } 851 852 void _openReactorEpoll() { 853 poller.open(); 854 } 855 856 void _closeReactorEpoll() { 857 poller.close(); 858 } 859 860 version(unittest): 861 private class UnitTest { 862 import core.sys.posix.sys.types; 863 864 import mecca.lib.consts; 865 import mecca.reactor; 866 import mecca.runtime.ut; 867 868 void testBody() { 869 FD pipeReadFD, pipeWriteFD; 870 createPipe(pipeReadFD, pipeWriteFD); 871 ReactorFD pipeRead = ReactorFD(move(pipeReadFD)); 872 ReactorFD pipeWrite = ReactorFD(move(pipeWriteFD)); 873 874 void reader() { 875 uint[1024] buffer; 876 enum BUFF_SIZE = typeof(buffer).sizeof; 877 uint lastNum = -1; 878 879 // Send 2MB over the pipe 880 ssize_t res; 881 while((res = pipeRead.read(buffer))>0) { 882 assert(res==BUFF_SIZE, "Short read from pipe"); 883 assert(buffer[0] == ++lastNum, "Read incorrect value from buffer"); 884 } 885 886 errnoEnforceNGC(res==0, "Read failed from pipe"); 887 INFO!"Reader finished"(); 888 theReactor.stop(); 889 } 890 891 void writer() { 892 uint[1024] buffer; 893 enum BUFF_SIZE = typeof(buffer).sizeof; 894 895 // Send 2MB over the pipe 896 while(buffer[0] < (2*MB/BUFF_SIZE)) { 897 ssize_t res = pipeWrite.write(buffer); 898 errnoEnforceNGC( res>=0, "Write failed on pipe"); 899 assert( res==BUFF_SIZE, "Short write to pipe" ); 900 buffer[0]++; 901 } 902 903 INFO!"Writer finished - closing pipe"(); 904 pipeWrite.close(); 905 } 906 907 theReactor.spawnFiber(&reader); 908 theReactor.spawnFiber(&writer); 909 910 theReactor.start(); 911 } 912 913 @mecca_ut: 914 void normalPoller() { 915 theReactor.setup(); 916 scope(success) theReactor.teardown(); 917 918 testBody(); 919 } 920 921 void timedPoller() { 922 Reactor.OpenOptions options; 923 924 options.registerDefaultIdler = false; 925 theReactor.setup(options); 926 scope(success) theReactor.teardown(); 927 928 theReactor.registerRecurringTimer(1.msecs, &poller.poll); 929 930 testBody(); 931 } 932 933 mixin TEST_FIXTURE!UnitTest; 934 }