1 /// File descriptor management 2 module mecca.lib.io; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import core.stdc.errno; 7 import core.sys.posix.unistd; 8 public import core.sys.posix.fcntl : 9 O_ACCMODE, O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_TRUNC, O_APPEND, O_SYNC, O_NOCTTY; 10 /* O_NOATIME, O_NOFOLLOW not included because not defined */ 11 import core.sys.posix.fcntl; 12 import std.algorithm : min, move; 13 import std.conv; 14 import std.traits; 15 16 import mecca.lib.exception; 17 import mecca.lib.memory; 18 import mecca.lib..string; 19 import mecca.log; 20 import mecca.platform.x86; 21 import mecca.platform.os : EREMOTEIO, O_CLOEXEC; 22 23 /// Exception thrown if a read/recv receives partial data 24 /// 25 /// `ErrnoException.errno` will report `EREMOTEIO`. 26 class ShortRead : ErrnoException { 27 this(string msg, string file = __FILE_FULL_PATH__, size_t line = __LINE__) @trusted { 28 super(msg, EREMOTEIO, file, line); 29 } 30 } 31 32 unittest { 33 auto except = new ShortRead("Message"); 34 assert(except.errno == EREMOTEIO); 35 } 36 37 private extern(C) nothrow @trusted @nogc { 38 int pipe2(ref int[2], int flags); 39 } 40 41 /** 42 * File descriptor wrapper 43 * 44 * This wrapper's main purpose is to protect the fd against leakage. It does not actually $(I do) anything. 45 */ 46 struct FD { 47 private: 48 enum InvalidFd = -1; 49 int fd = InvalidFd; 50 51 public: 52 @disable this(this); 53 54 /** 55 * Initialize from an OS file descriptor. 56 * 57 * Params: 58 * fd = OS handle of file to wrap. 59 */ 60 this(int fd) nothrow @safe @nogc { 61 ASSERT!"FD initialized with an invalid FD %s"(fd>=0, fd); 62 this.fd = fd; 63 } 64 65 /** 66 * Open a new file. 67 * 68 * Parameters: 69 * Same as for the `open`(2) command; 70 */ 71 this(string path, int flags, mode_t mode = octal!666) @trusted @nogc { 72 fd = .open(path.toStringzNGC, flags | O_CLOEXEC, mode); 73 errnoEnforceNGC(fd>=0, "File open failed"); 74 } 75 76 ~this() nothrow @safe @nogc { 77 close(); 78 } 79 80 /** 81 * Wrapper for adopting an fd immediately after being returned from external function 82 * 83 * The usage should be `FD fd = FD.adopt!"open"( .open("/path/to/file", O_RDWR) );` 84 * 85 * Throws: 86 * ErrnoException in case fd is invalid 87 */ 88 @notrace static FD adopt(string errorMsg)(int fd) @safe @nogc { 89 errnoEnforceNGC(fd>=0, errorMsg); 90 return FD(fd); 91 } 92 93 /** 94 * Call an OS function that accepts an FD as the first argument. 95 * 96 * Parameters: 97 * The parameters are the arguments that OS function accepts without the first one (the file descriptor). 98 * 99 * Returns: 100 * Whatever the original OS function returns. 101 */ 102 auto osCall(alias F, T...)(T args) nothrow @nogc if( is( Parameters!F[0]==int ) ) { 103 // We're using T... rather than Parameters!F because the later does not handle variadic functions very well. 104 // XXX Consider having two versions of this function 105 import mecca.lib.reflection : as; 106 107 static assert( fullyQualifiedName!F != fullyQualifiedName!(.close), 108 "Do not try to close the fd directly. Use FD.close instead." ); 109 ReturnType!F res; 110 as!"nothrow @nogc"({ res = F(fd, args); }); 111 112 return res; 113 } 114 115 /// @safe read 116 auto read(void[] buffer) @trusted @nogc { 117 return checkedCall!(core.sys.posix.unistd.read)(buffer.ptr, buffer.length, "read failed"); 118 } 119 120 /// @safe write 121 auto write(const void[] buffer) @trusted @nogc { 122 return checkedCall!(core.sys.posix.unistd.write)(buffer.ptr, buffer.length, "write failed"); 123 } 124 125 /** 126 * Run an fd based function and throw if it fails 127 * 128 * This function behave the same as `osCall`, except if the return is -1, it will throw an ErrnoException 129 */ 130 auto checkedCall(alias F, T...)(T args, string errorMessage) @system @nogc 131 if( is( Parameters!F[0]==int ) ) 132 { 133 auto ret = osCall!F(args); 134 135 errnoEnforceNGC(ret!=-1, errorMessage); 136 137 return ret; 138 } 139 140 /** 141 * Close the OS handle prematurely. 142 * 143 * Closes the OS handle. This happens automatically on struct destruction. It is only necessary to call this method if you wish to close 144 * the underlying FD before the struct goes out of scope. 145 * 146 * Throws: 147 * Nothing. There is nothing useful to do if close fails. 148 */ 149 void close() nothrow @safe @nogc { 150 if( fd != InvalidFd ) { 151 .close(fd); 152 } 153 154 fd = InvalidFd; 155 } 156 157 /** 158 * Obtain the underlying OS handle 159 * 160 * This returns the underlying OS handle for use directly with OS calls. 161 * 162 * Warning: 163 * Do not use this function to directly call the close system call. Doing so may lead to quite difficult to debug problems across your 164 * program. If another part of the program gets the same FD number, it can be quite difficult to find out what went wrong. 165 */ 166 @property int fileNo() pure nothrow @safe @nogc { 167 return fd; 168 } 169 170 /** 171 * Report whether the FD currently holds a valid fd 172 * 173 * Additional_Details: 174 * Hold stick near centre of its length. Moisten pointed end in mouth. Insert in tooth space, blunt end next to gum. Use gentle in-out 175 * motion. 176 * 177 * See_Also: 178 * <a href="http://hitchhikers.wikia.com/wiki/Wonko_the_Sane">Wonko the sane</a> 179 * 180 * Returns: true if valid 181 */ 182 @property bool isValid() pure const nothrow @safe @nogc { 183 return fd != InvalidFd; 184 } 185 186 /** Duplicate an FD 187 * 188 * Does the same as the `dup` system call. 189 * 190 * Returns: 191 * An FD representing a duplicate of the current FD. 192 */ 193 @notrace FD dup() @trusted @nogc { 194 import core.sys.posix.fcntl : fcntl; 195 import mecca.platform.os : F_DUPFD_CLOEXEC; 196 197 int newFd = osCall!(fcntl)( F_DUPFD_CLOEXEC, 0 ); 198 errnoEnforceNGC(newFd!=-1, "Failed to duplicate FD"); 199 return FD( newFd ); 200 } 201 } 202 203 /** 204 * create an unnamed pipe pair 205 * 206 * Params: 207 * readEnd = `FD` struct to receive the reading (output) end of the pipe 208 * writeEnd = `FD` struct to receive the writing (input) end of the pipe 209 */ 210 void createPipe(out FD readEnd, out FD writeEnd) @trusted @nogc { 211 int[2] pipeRawFD; 212 213 errnoEnforceNGC( pipe2(pipeRawFD, O_CLOEXEC )>=0, "OS pipe creation failed" ); 214 215 readEnd = FD( pipeRawFD[0] ); 216 writeEnd = FD( pipeRawFD[1] ); 217 } 218 219 unittest { 220 import core.stdc.errno; 221 import core.sys.posix.fcntl; 222 import std.conv; 223 224 int fd1copy, fd2copy; 225 226 { 227 auto fd = FD.adopt!"open"(open("/tmp/meccaUTfile1", O_CREAT|O_RDWR|O_TRUNC, octal!666)); 228 fd1copy = fd.fileNo; 229 230 fd.osCall!write("Hello, world\n".ptr, 13); 231 // The following line should not compile: 232 // fd.osCall!(.close)(); 233 234 unlink("/tmp/meccaUTfile1"); 235 236 fd = FD.adopt!"open"( open("/tmp/meccaUTfile2", O_CREAT|O_RDWR|O_TRUNC, octal!666) ); 237 fd2copy = fd.fileNo; 238 239 unlink("/tmp/meccaUTfile2"); 240 } 241 242 assert( .close(fd1copy)<0 && errno==EBADF, "FD1 was not closed" ); 243 assert( .close(fd2copy)<0 && errno==EBADF, "FD2 was not closed" ); 244 } 245 246 /// A wrapper to perform buffered IO over another IO type 247 struct BufferedIO(T) { 248 enum MIN_BUFF_SIZE = 128; 249 250 private: 251 MmapArray!ubyte rawMemory; 252 void[] readBuffer, writeBuffer; 253 size_t readBufferSize; 254 public T fd; // Declared public so it is visible through alias this 255 256 public: 257 // BufferedIO is not copyable even if T is copyable (which it typically won't be) 258 @disable this(this); 259 260 /** Construct an initialized buffered IO object 261 * 262 * `fd` is the FD object to wrap. Other arguments are the same as for the `open` call. 263 */ 264 this(T fd, size_t bufferSize) { 265 this.open(bufferSize); 266 this.fd = move(fd); 267 } 268 269 this(T fd, size_t readBufferSize, size_t writeBufferSize) { 270 this.open(readBufferSize, writeBufferSize); 271 this.fd = move(fd); 272 } 273 274 /// Struct destructor 275 /// 276 /// Warning: 277 /// $(B The destructor does not flush outstanding writes). This is because it might be called from an exception 278 /// context where such flushes are not possible. Adding `scope(success) io.flush();` is recommended. 279 ~this() @safe @nogc { 280 closeNoFlush(); 281 // No point in closing the underlying FD. Its own destructor should do that. 282 } 283 284 /** 285 * Prepare the buffers. 286 * 287 * The first form sets the same buffer size for both read and write operations. The second sets the read and write 288 * buffer sizes independently. 289 */ 290 void open(size_t bufferSize) @safe @nogc { 291 open( bufferSize, bufferSize ); 292 } 293 294 /// ditto 295 void open(size_t readBufferSize, size_t writeBufferSize) @safe @nogc { 296 ASSERT!"BufferedIO.open called twice"( rawMemory.closed ); 297 assertGE( readBufferSize, MIN_BUFF_SIZE, "readBufferSize not big enough" ); 298 assertGE( writeBufferSize, MIN_BUFF_SIZE, "writeBufferSize not big enough" ); 299 300 // Round readBufferSize to a multiple of the cacheline size, so that the write buffer be cache aligned 301 readBufferSize += CACHE_LINE_SIZE-1; 302 readBufferSize -= readBufferSize % CACHE_LINE_SIZE; 303 304 size_t total = readBufferSize + writeBufferSize; 305 306 // Round size up to next page 307 total += SYS_PAGE_SIZE - 1; 308 total -= total % SYS_PAGE_SIZE; 309 310 rawMemory.allocate( total, false ); // We do NOT want the GC to scan this area 311 size_t added = total - readBufferSize - writeBufferSize; 312 added /= 2; 313 added -= added % CACHE_LINE_SIZE; 314 this.readBufferSize = readBufferSize + added; 315 316 readBuffer = null; 317 writeBuffer = null; 318 } 319 320 /** Close the buffered IO 321 * 322 * This flushes all outstanding writes, closes the underlying FD and releases the buffers. 323 */ 324 void close() @notrace { 325 flush(); 326 closeNoFlush(); 327 fd.close(); 328 } 329 330 /** Forget all pending writes 331 * 332 * This causes the `BufferedIO` to discard all unflushed write data. Cached read data is unaffected. 333 */ 334 void reset() @safe @nogc nothrow pure { 335 writeBuffer = null; 336 } 337 338 /// Perform @safe buffered read 339 /// 340 /// Notice that if there is data already in the buffers, that data is what will be returned, even if the read 341 /// requested more (partial result). 342 auto read(ARGS...)(void[] buffer, ARGS args) @trusted @nogc { 343 size_t cachedLength = min(buffer.length, readBuffer.length); 344 if( cachedLength>0 ) { 345 // Data already in buffer 346 buffer[0..cachedLength][] = readBuffer[0..cachedLength][]; 347 readBuffer = readBuffer[cachedLength..$]; 348 349 return cachedLength; 350 } 351 352 cachedLength = fd.read(rawReadBuffer, args); 353 readBuffer = rawReadBuffer[0..cachedLength]; 354 355 if( cachedLength==0 ) 356 return 0; 357 358 // Call ourselves again. Since the buffer is now not empty, the call should succeed without performing the 359 // actual underlying read again. 360 return read(buffer, args); 361 } 362 363 /** Read a single line 364 * 365 * Reads a single line from the buffered IO. The line must fit the struct's allocated read buffer. 366 * 367 * returns: 368 * The function returns a `const(char)[]` that points to the file's read buffer. This means that no copy is done 369 * (the data is read directly to its final resting place), but it also means that the data might disappear if 370 * further read operations are done on the file. 371 * 372 * Upon successful read, the returned range contains the terminating character as its last element. See Notes below 373 * for discussion on partial lines. 374 * 375 * In case of end of file, the function returns a slice of length 0. 376 * 377 * params: 378 * terminator = the line terminator to look for. 379 * partialOk = whether it is okay for the line to be partial. Defaults to `false`. 380 * args = the arguments for the underlying FD, if any. Can be used to pass `Timeout` to a `ReactorFD`. 381 * 382 * Notes: 383 * A partial line might happen in one of two cases. Either the line width is longer than the entire read buffer 384 * (as set when calling `open` or during construction), or the file sends an EOF without terminating the last line. 385 * 386 * What happens in that case depends on whether `partialOk` is set. If it is, the buffer is returned with no 387 * terminator as the last character. If `partialOk` is clear (the default) then the call throws a `ShortRead` 388 * exception. 389 */ 390 const(char)[] readLine(char terminator = '\n') @trusted @nogc { 391 return readLine(terminator, false); 392 } 393 394 const(char)[] readLine(ARGS...)(char terminator, bool partialOk, ARGS args) @trusted @nogc { 395 size_t readLineFromBuffer(size_t start, char terminator) @trusted @nogc nothrow { 396 const(char)[] convertedBuffer = cast(const(char)[])readBuffer; 397 398 // Can't use std.algorithm.searching.find, as it's a GC function. Of course, the only reason it allocates is 399 // because it needs to throw range.empty if not found. Since we would then have to also catch that exception, 400 // using it quickly spirals out of control. 401 // 402 // Instead, we're going to spend the 4 lines it takes to roll our own :-( 403 foreach(i; start..readBuffer.length) { 404 if( convertedBuffer[i]==terminator ) { 405 return i+1; 406 } 407 } 408 409 return 0; 410 } 411 412 // Fast path - the entire line is already present in the buffer 413 size_t intermediateLength = readLineFromBuffer(0, terminator); 414 if( intermediateLength!=0 ) { 415 const(char)[] result = cast(const(char)[])readBuffer[0..intermediateLength]; 416 readBuffer = readBuffer[intermediateLength..$]; 417 418 return result; 419 } 420 421 // Move the buffer we already have to the beginning of the memory, to maximize the efficiency of the next read 422 if( readBuffer.ptr!=rawReadBuffer.ptr ) { 423 // No point in moving memory if it's already at the beginning 424 foreach(i; 0..readBuffer.length) { 425 // Can't use array operations because the ranges might overlap 426 (cast(char[])rawReadBuffer)[i] = (cast(const(char)[])readBuffer)[i]; 427 } 428 429 readBuffer = rawReadBuffer[0..readBuffer.length]; 430 } 431 432 const(char)[] partialResult() { 433 if( partialOk ) { 434 const(char)[] result = (cast(const(char)[])readBuffer); 435 readBuffer = null; 436 437 return result; 438 } else { 439 throw mkExFmt!ShortRead("readline unable to find terminator in %s bytes", readBuffer.length); 440 } 441 } 442 443 intermediateLength = readBuffer.length; 444 size_t numRead; 445 size_t terminatorLocation; 446 do { 447 intermediateLength += numRead; 448 if( readBuffer.length==readBufferSize ) { 449 return partialResult(); // No more room to expand 450 } 451 452 numRead = fd.read(rawReadBuffer[intermediateLength..$], args); 453 if( numRead==0 ) 454 return partialResult(); // EOF 455 456 readBuffer = rawReadBuffer[0..intermediateLength + numRead]; 457 } while( (terminatorLocation = readLineFromBuffer(intermediateLength, terminator))==0 ); 458 459 DBG_ASSERT!"Terminator not found in successful loop completion: loc %s str \"%s\""( 460 (cast(const(char)[])readBuffer)[terminatorLocation-1]==terminator, terminatorLocation, 461 (cast(const(char)[])readBuffer)[0..terminatorLocation]); 462 auto result = (cast(const(char)[])readBuffer)[0..terminatorLocation]; 463 readBuffer = readBuffer[terminatorLocation..$]; 464 465 return result; 466 } 467 468 /// Perform @safe buffered write 469 /// 470 /// Function does not return until all data is either written to FD or buffered 471 void write(ARGS...)(const(void)[] buffer, ARGS args) @trusted @nogc { 472 if( buffer.length>rawWriteBuffer.length ) { 473 // Buffer is big - write it directly to save on copies 474 flush(args); 475 DBG_ASSERT!"write buffer is not empty after flush"(writeBuffer.length == 0); 476 while( buffer.length>0 ) { 477 auto numWritten = fd.write(buffer, args); 478 buffer = buffer[numWritten..$]; 479 } 480 481 return; 482 } 483 484 while( buffer.length>0 ) { 485 size_t start = writeBuffer.length; 486 size_t writeSize = rawWriteBuffer.length - start; 487 writeSize = min(writeSize, buffer.length); 488 489 writeBuffer = rawWriteBuffer[0 .. start+writeSize]; 490 writeBuffer[start..$][] = buffer[0..writeSize][]; 491 492 buffer = buffer[writeSize..$]; 493 494 if( writeBuffer.length==rawWriteBuffer.length ) 495 flush(args); 496 } 497 } 498 499 /// Flush the write buffers 500 void flush(ARGS...)(ARGS args) @trusted @nogc { 501 scope(failure) { 502 /* In case of mid-op failure, writeBuffer might end up not at the start of rawBuffer, which violates 503 * invariants assumed elsewhere in the code. 504 */ 505 auto len = writeBuffer.length; 506 // Source and destination buffers may overlap, so we cannot use slice operation for the copy 507 foreach( i, d; cast(ubyte[])writeBuffer ) 508 (cast(ubyte[])rawWriteBuffer)[i] = d; 509 writeBuffer = rawWriteBuffer[0..len]; 510 } 511 512 while( writeBuffer.length>0 ) { 513 auto numWritten = fd.write(writeBuffer, args); 514 writeBuffer = writeBuffer[numWritten..$]; 515 } 516 } 517 518 /** Attach an underlying FD to the buffered IO instance 519 * 520 * Instance must be open and not already attached 521 */ 522 ref BufferedIO opAssign(T fd) { 523 ASSERT!"Attaching fd to an open buffered IO"(!this.fd.isValid); 524 ASSERT!"Trying to attach an fd to a closed BufferedIO"( !rawMemory.closed ); 525 move(fd, this.fd); 526 527 return this; 528 } 529 530 alias fd this; 531 private: 532 size_t writeBufferSize() const pure nothrow @safe @nogc { 533 return rawMemory.length - readBufferSize; 534 } 535 536 @property void[] rawReadBuffer() pure nothrow @safe @nogc { 537 DBG_ASSERT!"Did not call open"(readBufferSize!=0); 538 DBG_ASSERT!"readBufferSize greater than total raw memory. %s<%s"( 539 readBufferSize<rawMemory.length, readBufferSize, rawMemory.length ); 540 return rawMemory[0..readBufferSize]; 541 } 542 @property void[] rawWriteBuffer() pure nothrow @safe @nogc { 543 DBG_ASSERT!"readBufferSize greater than total raw memory. %s<%s"( 544 readBufferSize<rawMemory.length, readBufferSize, rawMemory.length ); 545 return rawMemory[readBufferSize..$]; 546 } 547 548 @notrace void closeNoFlush() @safe @nogc { 549 if( writeBuffer.length!=0 ) { 550 ERROR!"Closing BufferedIO while it still has unflushed data to write"(); 551 } 552 rawMemory.free(); 553 readBufferSize = 0; 554 readBuffer = null; 555 writeBuffer = null; 556 } 557 } 558 559 unittest { 560 enum TestSize = 32000; 561 enum ReadBuffSize = 2000; 562 enum WriteBuffSize = 2000; 563 564 ubyte[] reference; 565 uint numReads, numWrites; 566 567 struct MockFD { 568 uint readOffset, writeOffset; 569 bool opened = true; 570 571 ssize_t read(void[] buffer) @nogc { 572 auto len = min(buffer.length, reference.length - readOffset); 573 buffer[0..len] = reference[readOffset..readOffset+len][]; 574 readOffset += len; 575 if( len>0 ) 576 numReads++; 577 578 return len; 579 } 580 581 size_t write(const void[] buffer) @nogc { 582 foreach(datum; cast(ubyte[])buffer) { 583 assertEQ(datum, cast(ubyte)(reference[writeOffset]+1)); 584 writeOffset++; 585 } 586 587 numWrites++; 588 589 return buffer.length; 590 } 591 592 @property bool isValid() const @nogc { 593 return opened; 594 } 595 596 void close() @nogc { 597 opened = false; 598 } 599 } 600 601 BufferedIO!MockFD fd; 602 603 import std.random; 604 auto seed = unpredictableSeed; 605 scope(failure) ERROR!"Test failed with seed %s"(seed); 606 auto rand = Random(seed); 607 608 reference.length = TestSize; 609 foreach(ref d; reference) { 610 d = uniform!ubyte(rand); 611 } 612 613 fd.open(ReadBuffSize, WriteBuffSize); 614 615 ubyte[17] buffer; 616 ssize_t numRead; 617 size_t total; 618 while( (numRead = fd.read(buffer))>0 ) { 619 total+=numRead; 620 buffer[0..numRead][] += 1; 621 fd.write(buffer[0..numRead]); 622 } 623 624 fd.flush(); 625 626 assertEQ(total, TestSize, "Incorrect total number of bytes processed"); 627 assertEQ(fd.readOffset, TestSize, "Did not read correct number of bytes"); 628 assertEQ(fd.writeOffset, TestSize, "Did not write correct number of bytes"); 629 assertEQ(numReads, 16); 630 assertEQ(numWrites, 16); 631 } 632 633 unittest { 634 import mecca.lib.time; 635 import mecca.reactor; 636 import mecca.reactor.io.fd; 637 import mecca.reactor.subsystems.poller; 638 639 void testBody() { 640 FD pipeReadFD, pipeWriteFD; 641 createPipe(pipeReadFD, pipeWriteFD); 642 643 BufferedIO!ReactorFD pipeRead = BufferedIO!ReactorFD( ReactorFD(move(pipeReadFD)), 128, 4096-128); 644 ReactorFD pipeWrite = ReactorFD(move(pipeWriteFD)); 645 646 uint numLinesRead, numLinesFailed; 647 648 auto timeout = Timeout(300.msecs); 649 650 enum string[] expectedLines = [ "First line", "Second line", "", "Fourth line", 651 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" ~ 652 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 653 "aaaaaaaaaaaa", "" ]; 654 void reader() { 655 const(char)[] line; 656 int i; 657 do { 658 line = pipeRead.readLine('\n', true, timeout); 659 if( line.length>0 && line[$-1]=='\n' ) { 660 DEBUG!"Read \"%s\" expected \"%s\""( line[0..$-1], expectedLines[i] ); 661 assertEQ( expectedLines[i], line[0..$-1], ": Read the wrong data" ); 662 ++i; 663 numLinesRead++; 664 } else { 665 DEBUG!"Read \"%s\" expected \"%s\""( line[0..$], expectedLines[i] ); 666 assertEQ( expectedLines[i], line[0..$], ": Read the wrong data" ); 667 ++i; 668 numLinesFailed++; 669 } 670 } while(line.length>0); 671 } 672 673 theReactor.spawnFiber(&reader); 674 675 void writeData(string data) { 676 pipeWrite.write(data, timeout); 677 poller.poll(); // Otherwise the other fiber won't get scheduled 678 theReactor.yield(); 679 } 680 681 // Write two complete lines and one incomplete one 682 writeData("First line\nSecond line\n\nFour"); 683 assertEQ(numLinesRead, 3, "Read lines count incorrect"); 684 assertEQ(numLinesFailed, 0, "Failed lines count incorrect"); 685 686 writeData("th lin"); 687 assertEQ(numLinesRead, 3, "Read lines count incorrect"); 688 assertEQ(numLinesFailed, 0, "Failed lines count incorrect"); 689 690 writeData("e\n"); 691 assertEQ(numLinesRead, 4, "Read lines count incorrect"); 692 assertEQ(numLinesFailed, 0, "Failed lines count incorrect"); 693 694 writeData("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); // 70 "a"s 695 assertEQ(numLinesRead, 4, "Read lines count incorrect"); 696 assertEQ(numLinesFailed, 0, "Failed lines count incorrect"); 697 698 writeData("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); // 70 more "a"s 699 assertEQ(numLinesRead, 4, "Read lines count incorrect"); 700 assertEQ(numLinesFailed, 1, "Failed lines count incorrect"); 701 702 destroy(pipeWrite); 703 poller.poll(); // Otherwise the other fiber won't get scheduled 704 theReactor.yield(); 705 706 assertEQ(numLinesRead, 4, "Read lines count incorrect"); 707 assertEQ(numLinesFailed, 3, "Failed lines count incorrect"); 708 } 709 710 testWithReactor(&testBody); 711 }