1 /// File descriptor management
2 module mecca.lib.io;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import core.stdc.errno;
7 import core.sys.posix.unistd;
8 public import core.sys.posix.fcntl :
9     O_ACCMODE, O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_TRUNC, O_APPEND, O_SYNC, O_NOCTTY;
10     /* O_NOATIME, O_NOFOLLOW not included because not defined */
11 import core.sys.posix.fcntl;
12 import std.algorithm : min, move;
13 import std.conv;
14 import std.traits;
15 
16 import mecca.lib.exception;
17 import mecca.lib.memory;
18 import mecca.lib..string;
19 import mecca.log;
20 import mecca.platform.x86;
21 import mecca.platform.os : EREMOTEIO, O_CLOEXEC;
22 
23 /// Exception thrown if a read/recv receives partial data
24 ///
25 /// `ErrnoException.errno` will report `EREMOTEIO`.
26 class ShortRead : ErrnoException {
27     this(string msg, string file = __FILE_FULL_PATH__, size_t line = __LINE__) @trusted {
28         super(msg, EREMOTEIO, file, line);
29     }
30 }
31 
32 unittest {
33     auto except = new ShortRead("Message");
34     assert(except.errno == EREMOTEIO);
35 }
36 
37 private extern(C) nothrow @trusted @nogc {
38     int pipe2(ref int[2], int flags);
39 }
40 
41 /**
42  * File descriptor wrapper
43  *
44  * This wrapper's main purpose is to protect the fd against leakage. It does not actually $(I do) anything.
45  */
46 struct FD {
47 private:
48     enum InvalidFd = -1;
49     int fd = InvalidFd;
50 
51 public:
52     @disable this(this);
53 
54     /**
55      * Initialize from an OS file descriptor.
56      *
57      * Params:
58      *  fd = OS handle of file to wrap.
59      */
60     this(int fd) nothrow @safe @nogc {
61         ASSERT!"FD initialized with an invalid FD %s"(fd>=0, fd);
62         this.fd = fd;
63     }
64 
65     /**
66      * Open a new file.
67      *
68      * Parameters:
69      * Same as for the `open`(2) command;
70      */
71     this(string path, int flags, mode_t mode = octal!666) @trusted @nogc {
72         fd = .open(path.toStringzNGC, flags | O_CLOEXEC, mode);
73         errnoEnforceNGC(fd>=0, "File open failed");
74     }
75 
76     ~this() nothrow @safe @nogc {
77         close();
78     }
79 
80     /**
81      * Wrapper for adopting an fd immediately after being returned from external function
82      *
83      * The usage should be `FD fd = FD.adopt!"open"( .open("/path/to/file", O_RDWR) );`
84      *
85      * Throws:
86      * ErrnoException in case fd is invalid
87      */
88     @notrace static FD adopt(string errorMsg)(int fd) @safe @nogc {
89         errnoEnforceNGC(fd>=0, errorMsg);
90         return FD(fd);
91     }
92 
93     /**
94      * Call an OS function that accepts an FD as the first argument.
95      *
96      * Parameters:
97      * The parameters are the arguments that OS function accepts without the first one (the file descriptor).
98      *
99      * Returns:
100      * Whatever the original OS function returns.
101      */
102     auto osCall(alias F, T...)(T args) nothrow @nogc if( is( Parameters!F[0]==int ) ) {
103         // We're using T... rather than Parameters!F because the later does not handle variadic functions very well.
104         // XXX Consider having two versions of this function
105         import mecca.lib.reflection : as;
106 
107         static assert( fullyQualifiedName!F != fullyQualifiedName!(.close),
108                 "Do not try to close the fd directly. Use FD.close instead." );
109         ReturnType!F res;
110         as!"nothrow @nogc"({ res = F(fd, args); });
111 
112         return res;
113     }
114 
115     /// @safe read
116     auto read(void[] buffer) @trusted @nogc {
117         return checkedCall!(core.sys.posix.unistd.read)(buffer.ptr, buffer.length, "read failed");
118     }
119 
120     /// @safe write
121     auto write(const void[] buffer) @trusted @nogc {
122         return checkedCall!(core.sys.posix.unistd.write)(buffer.ptr, buffer.length, "write failed");
123     }
124 
125     /**
126      * Run an fd based function and throw if it fails
127      *
128      * This function behave the same as `osCall`, except if the return is -1, it will throw an ErrnoException
129      */
130     auto checkedCall(alias F, T...)(T args, string errorMessage) @system @nogc
131             if( is( Parameters!F[0]==int ) )
132     {
133         auto ret = osCall!F(args);
134 
135         errnoEnforceNGC(ret!=-1, errorMessage);
136 
137         return ret;
138     }
139 
140     /**
141      * Close the OS handle prematurely.
142      *
143      * Closes the OS handle. This happens automatically on struct destruction. It is only necessary to call this method if you wish to close
144      * the underlying FD before the struct goes out of scope.
145      *
146      * Throws:
147      * Nothing. There is nothing useful to do if close fails.
148      */
149     void close() nothrow @safe @nogc {
150         if( fd != InvalidFd ) {
151             .close(fd);
152         }
153 
154         fd = InvalidFd;
155     }
156 
157     /**
158       * Obtain the underlying OS handle
159       *
160       * This returns the underlying OS handle for use directly with OS calls.
161       *
162       * Warning:
163       * Do not use this function to directly call the close system call. Doing so may lead to quite difficult to debug problems across your
164       * program. If another part of the program gets the same FD number, it can be quite difficult to find out what went wrong.
165       */
166     @property int fileNo() pure nothrow @safe @nogc {
167         return fd;
168     }
169 
170     /**
171      * Report whether the FD currently holds a valid fd
172      *
173      * Additional_Details:
174      * Hold stick near centre of its length. Moisten pointed end in mouth. Insert in tooth space, blunt end next to gum. Use gentle in-out
175      * motion.
176      *
177      * See_Also:
178      * <a href="http://hitchhikers.wikia.com/wiki/Wonko_the_Sane">Wonko the sane</a>
179      *
180      * Returns: true if valid
181      */
182     @property bool isValid() pure const nothrow @safe @nogc {
183         return fd != InvalidFd;
184     }
185 
186     /** Duplicate an FD
187      *
188      * Does the same as the `dup` system call.
189      *
190      * Returns:
191      * An FD representing a duplicate of the current FD.
192      */
193     @notrace FD dup() @trusted @nogc {
194         import core.sys.posix.fcntl : fcntl;
195         import mecca.platform.os : F_DUPFD_CLOEXEC;
196 
197         int newFd = osCall!(fcntl)( F_DUPFD_CLOEXEC, 0 );
198         errnoEnforceNGC(newFd!=-1, "Failed to duplicate FD");
199         return FD( newFd );
200     }
201 }
202 
203 /**
204  * create an unnamed pipe pair
205  *
206  * Params:
207  *  readEnd = `FD` struct to receive the reading (output) end of the pipe
208  *  writeEnd = `FD` struct to receive the writing (input) end of the pipe
209  */
210 void createPipe(out FD readEnd, out FD writeEnd) @trusted @nogc {
211     int[2] pipeRawFD;
212 
213     errnoEnforceNGC( pipe2(pipeRawFD, O_CLOEXEC )>=0, "OS pipe creation failed" );
214 
215     readEnd = FD( pipeRawFD[0] );
216     writeEnd = FD( pipeRawFD[1] );
217 }
218 
219 unittest {
220     import core.stdc.errno;
221     import core.sys.posix.fcntl;
222     import std.conv;
223 
224     int fd1copy, fd2copy;
225 
226     {
227         auto fd = FD.adopt!"open"(open("/tmp/meccaUTfile1", O_CREAT|O_RDWR|O_TRUNC, octal!666));
228         fd1copy = fd.fileNo;
229 
230         fd.osCall!write("Hello, world\n".ptr, 13);
231         // The following line should not compile:
232         // fd.osCall!(.close)();
233 
234         unlink("/tmp/meccaUTfile1");
235 
236         fd = FD.adopt!"open"( open("/tmp/meccaUTfile2", O_CREAT|O_RDWR|O_TRUNC, octal!666) );
237         fd2copy = fd.fileNo;
238 
239         unlink("/tmp/meccaUTfile2");
240     }
241 
242     assert( .close(fd1copy)<0 && errno==EBADF, "FD1 was not closed" );
243     assert( .close(fd2copy)<0 && errno==EBADF, "FD2 was not closed" );
244 }
245 
246 /// A wrapper to perform buffered IO over another IO type
247 struct BufferedIO(T) {
248     enum MIN_BUFF_SIZE = 128;
249 
250 private:
251     MmapArray!ubyte rawMemory;
252     void[] readBuffer, writeBuffer;
253     size_t readBufferSize;
254     public T fd; // Declared public so it is visible through alias this
255 
256 public:
257     // BufferedIO is not copyable even if T is copyable (which it typically won't be)
258     @disable this(this);
259 
260     /** Construct an initialized buffered IO object
261      *
262      * `fd` is the FD object to wrap. Other arguments are the same as for the `open` call.
263      */
264     this(T fd, size_t bufferSize) {
265         this.open(bufferSize);
266         this.fd = move(fd);
267     }
268 
269     this(T fd, size_t readBufferSize, size_t writeBufferSize) {
270         this.open(readBufferSize, writeBufferSize);
271         this.fd = move(fd);
272     }
273 
274     /// Struct destructor
275     ///
276     /// Warning:
277     /// $(B The destructor does not flush outstanding writes). This is because it might be called from an exception
278     /// context where such flushes are not possible. Adding `scope(success) io.flush();` is recommended.
279     ~this() @safe @nogc {
280         closeNoFlush();
281         // No point in closing the underlying FD. Its own destructor should do that.
282     }
283 
284     /**
285      * Prepare the buffers.
286      *
287      * The first form sets the same buffer size for both read and write operations. The second sets the read and write
288      * buffer sizes independently.
289      */
290     void open(size_t bufferSize) @safe @nogc {
291         open( bufferSize, bufferSize );
292     }
293 
294     /// ditto
295     void open(size_t readBufferSize, size_t writeBufferSize) @safe @nogc {
296         ASSERT!"BufferedIO.open called twice"( rawMemory.closed );
297         assertGE( readBufferSize, MIN_BUFF_SIZE, "readBufferSize not big enough" );
298         assertGE( writeBufferSize, MIN_BUFF_SIZE, "writeBufferSize not big enough" );
299 
300         // Round readBufferSize to a multiple of the cacheline size, so that the write buffer be cache aligned
301         readBufferSize += CACHE_LINE_SIZE-1;
302         readBufferSize -= readBufferSize % CACHE_LINE_SIZE;
303 
304         size_t total = readBufferSize + writeBufferSize;
305 
306         // Round size up to next page
307         total += SYS_PAGE_SIZE - 1;
308         total -= total % SYS_PAGE_SIZE;
309 
310         rawMemory.allocate( total, false ); // We do NOT want the GC to scan this area
311         size_t added = total - readBufferSize - writeBufferSize;
312         added /= 2;
313         added -= added % CACHE_LINE_SIZE;
314         this.readBufferSize = readBufferSize + added;
315 
316         readBuffer = null;
317         writeBuffer = null;
318     }
319 
320     /** Close the buffered IO
321      *
322      * This flushes all outstanding writes, closes the underlying FD and releases the buffers.
323      */
324     void close() @notrace {
325         flush();
326         closeNoFlush();
327         fd.close();
328     }
329 
330     /** Forget all pending writes
331      *
332      * This causes the `BufferedIO` to discard all unflushed write data. Cached read data is unaffected.
333      */
334     void reset() @safe @nogc nothrow pure {
335         writeBuffer = null;
336     }
337 
338     /// Perform @safe buffered read
339     ///
340     /// Notice that if there is data already in the buffers, that data is what will be returned, even if the read
341     /// requested more (partial result).
342     auto read(ARGS...)(void[] buffer, ARGS args) @trusted @nogc {
343         size_t cachedLength = min(buffer.length, readBuffer.length);
344         if( cachedLength>0 ) {
345             // Data already in buffer
346             buffer[0..cachedLength][] = readBuffer[0..cachedLength][];
347             readBuffer = readBuffer[cachedLength..$];
348 
349             return cachedLength;
350         }
351 
352         cachedLength = fd.read(rawReadBuffer, args);
353         readBuffer = rawReadBuffer[0..cachedLength];
354 
355         if( cachedLength==0 )
356             return 0;
357 
358         // Call ourselves again. Since the buffer is now not empty, the call should succeed without performing the
359         // actual underlying read again.
360         return read(buffer, args);
361     }
362 
363     /** Read a single line
364      *
365      * Reads a single line from the buffered IO. The line must fit the struct's allocated read buffer.
366      *
367      * returns:
368      * The function returns a `const(char)[]` that points to the file's read buffer. This means that no copy is done
369      * (the data is read directly to its final resting place), but it also means that the data might disappear if
370      * further read operations are done on the file.
371      *
372      * Upon successful read, the returned range contains the terminating character as its last element. See Notes below
373      * for discussion on partial lines.
374      *
375      * In case of end of file, the function returns a slice of length 0.
376      *
377      * params:
378      * terminator = the line terminator to look for.
379      * partialOk = whether it is okay for the line to be partial. Defaults to `false`.
380      * args = the arguments for the underlying FD, if any. Can be used to pass `Timeout` to a `ReactorFD`.
381      *
382      * Notes:
383      * A partial line might happen in one of two cases. Either the line width is longer than the entire read buffer
384      * (as set when calling `open` or during construction), or the file sends an EOF without terminating the last line.
385      *
386      * What happens in that case depends on whether `partialOk` is set. If it is, the buffer is returned with no
387      * terminator as the last character. If `partialOk` is clear (the default) then the call throws a `ShortRead`
388      * exception.
389      */
390     const(char)[] readLine(char terminator = '\n') @trusted @nogc {
391         return readLine(terminator, false);
392     }
393 
394     const(char)[] readLine(ARGS...)(char terminator, bool partialOk, ARGS args) @trusted @nogc {
395         size_t readLineFromBuffer(size_t start, char terminator) @trusted @nogc nothrow {
396             const(char)[] convertedBuffer = cast(const(char)[])readBuffer;
397 
398             // Can't use std.algorithm.searching.find, as it's a GC function. Of course, the only reason it allocates is
399             // because it needs to throw range.empty if not found. Since we would then have to also catch that exception,
400             // using it quickly spirals out of control.
401             //
402             // Instead, we're going to spend the 4 lines it takes to roll our own :-(
403             foreach(i; start..readBuffer.length) {
404                 if( convertedBuffer[i]==terminator ) {
405                     return i+1;
406                 }
407             }
408 
409             return 0;
410         }
411 
412         // Fast path - the entire line is already present in the buffer
413         size_t intermediateLength = readLineFromBuffer(0, terminator);
414         if( intermediateLength!=0 ) {
415             const(char)[] result = cast(const(char)[])readBuffer[0..intermediateLength];
416             readBuffer = readBuffer[intermediateLength..$];
417 
418             return result;
419         }
420 
421         // Move the buffer we already have to the beginning of the memory, to maximize the efficiency of the next read
422         if( readBuffer.ptr!=rawReadBuffer.ptr ) {
423             // No point in moving memory if it's already at the beginning
424             foreach(i; 0..readBuffer.length) {
425                 // Can't use array operations because the ranges might overlap
426                 (cast(char[])rawReadBuffer)[i] = (cast(const(char)[])readBuffer)[i];
427             }
428 
429             readBuffer = rawReadBuffer[0..readBuffer.length];
430         }
431 
432         const(char)[] partialResult() {
433             if( partialOk ) {
434                 const(char)[] result = (cast(const(char)[])readBuffer);
435                 readBuffer = null;
436 
437                 return result;
438             } else {
439                 throw mkExFmt!ShortRead("readline unable to find terminator in %s bytes", readBuffer.length);
440             }
441         }
442 
443         intermediateLength = readBuffer.length;
444         size_t numRead;
445         size_t terminatorLocation;
446         do {
447             intermediateLength += numRead;
448             if( readBuffer.length==readBufferSize ) {
449                 return partialResult(); // No more room to expand
450             }
451 
452             numRead = fd.read(rawReadBuffer[intermediateLength..$], args);
453             if( numRead==0 )
454                 return partialResult(); // EOF
455 
456             readBuffer = rawReadBuffer[0..intermediateLength + numRead];
457         } while( (terminatorLocation = readLineFromBuffer(intermediateLength, terminator))==0 );
458 
459         DBG_ASSERT!"Terminator not found in successful loop completion: loc %s str \"%s\""(
460                 (cast(const(char)[])readBuffer)[terminatorLocation-1]==terminator, terminatorLocation,
461                 (cast(const(char)[])readBuffer)[0..terminatorLocation]);
462         auto result = (cast(const(char)[])readBuffer)[0..terminatorLocation];
463         readBuffer = readBuffer[terminatorLocation..$];
464 
465         return result;
466     }
467 
468     /// Perform @safe buffered write
469     ///
470     /// Function does not return until all data is either written to FD or buffered
471     void write(ARGS...)(const(void)[] buffer, ARGS args) @trusted @nogc {
472         if( buffer.length>rawWriteBuffer.length ) {
473             // Buffer is big - write it directly to save on copies
474             flush(args);
475             DBG_ASSERT!"write buffer is not empty after flush"(writeBuffer.length == 0);
476             while( buffer.length>0 ) {
477                 auto numWritten = fd.write(buffer, args);
478                 buffer = buffer[numWritten..$];
479             }
480 
481             return;
482         }
483 
484         while( buffer.length>0 ) {
485             size_t start = writeBuffer.length;
486             size_t writeSize = rawWriteBuffer.length - start;
487             writeSize = min(writeSize, buffer.length);
488 
489             writeBuffer = rawWriteBuffer[0 .. start+writeSize];
490             writeBuffer[start..$][] = buffer[0..writeSize][];
491 
492             buffer = buffer[writeSize..$];
493 
494             if( writeBuffer.length==rawWriteBuffer.length )
495                 flush(args);
496         }
497     }
498 
499     /// Flush the write buffers
500     void flush(ARGS...)(ARGS args) @trusted @nogc {
501         scope(failure) {
502             /* In case of mid-op failure, writeBuffer might end up not at the start of rawBuffer, which violates
503              * invariants assumed elsewhere in the code.
504              */
505             auto len = writeBuffer.length;
506             // Source and destination buffers may overlap, so we cannot use slice operation for the copy
507             foreach( i, d; cast(ubyte[])writeBuffer )
508                 (cast(ubyte[])rawWriteBuffer)[i] = d;
509             writeBuffer = rawWriteBuffer[0..len];
510         }
511 
512         while( writeBuffer.length>0 ) {
513             auto numWritten = fd.write(writeBuffer, args);
514             writeBuffer = writeBuffer[numWritten..$];
515         }
516     }
517 
518     /** Attach an underlying FD to the buffered IO instance
519      *
520      * Instance must be open and not already attached
521      */
522     ref BufferedIO opAssign(T fd) {
523         ASSERT!"Attaching fd to an open buffered IO"(!this.fd.isValid);
524         ASSERT!"Trying to attach an fd to a closed BufferedIO"( !rawMemory.closed );
525         move(fd, this.fd);
526 
527         return this;
528     }
529 
530     alias fd this;
531 private:
532     size_t writeBufferSize() const pure nothrow @safe @nogc {
533         return rawMemory.length - readBufferSize;
534     }
535 
536     @property void[] rawReadBuffer() pure nothrow @safe @nogc {
537         DBG_ASSERT!"Did not call open"(readBufferSize!=0);
538         DBG_ASSERT!"readBufferSize greater than total raw memory. %s<%s"(
539                 readBufferSize<rawMemory.length, readBufferSize, rawMemory.length );
540         return rawMemory[0..readBufferSize];
541     }
542     @property void[] rawWriteBuffer() pure nothrow @safe @nogc {
543         DBG_ASSERT!"readBufferSize greater than total raw memory. %s<%s"(
544                 readBufferSize<rawMemory.length, readBufferSize, rawMemory.length );
545         return rawMemory[readBufferSize..$];
546     }
547 
548     @notrace void closeNoFlush() @safe @nogc {
549         if( writeBuffer.length!=0 ) {
550             ERROR!"Closing BufferedIO while it still has unflushed data to write"();
551         }
552         rawMemory.free();
553         readBufferSize = 0;
554         readBuffer = null;
555         writeBuffer = null;
556     }
557 }
558 
559 unittest {
560     enum TestSize = 32000;
561     enum ReadBuffSize = 2000;
562     enum WriteBuffSize = 2000;
563 
564     ubyte[] reference;
565     uint numReads, numWrites;
566 
567     struct MockFD {
568         uint readOffset, writeOffset;
569         bool opened = true;
570 
571         ssize_t read(void[] buffer) @nogc {
572             auto len = min(buffer.length, reference.length - readOffset);
573             buffer[0..len] = reference[readOffset..readOffset+len][];
574             readOffset += len;
575             if( len>0 )
576                 numReads++;
577 
578             return len;
579         }
580 
581         size_t write(const void[] buffer) @nogc {
582             foreach(datum; cast(ubyte[])buffer) {
583                 assertEQ(datum, cast(ubyte)(reference[writeOffset]+1));
584                 writeOffset++;
585             }
586 
587             numWrites++;
588 
589             return buffer.length;
590         }
591 
592         @property bool isValid() const @nogc {
593             return opened;
594         }
595 
596         void close() @nogc {
597             opened = false;
598         }
599     }
600 
601     BufferedIO!MockFD fd;
602 
603     import std.random;
604     auto seed = unpredictableSeed;
605     scope(failure) ERROR!"Test failed with seed %s"(seed);
606     auto rand = Random(seed);
607 
608     reference.length = TestSize;
609     foreach(ref d; reference) {
610         d = uniform!ubyte(rand);
611     }
612 
613     fd.open(ReadBuffSize, WriteBuffSize);
614 
615     ubyte[17] buffer;
616     ssize_t numRead;
617     size_t total;
618     while( (numRead = fd.read(buffer))>0 ) {
619         total+=numRead;
620         buffer[0..numRead][] += 1;
621         fd.write(buffer[0..numRead]);
622     }
623 
624     fd.flush();
625 
626     assertEQ(total, TestSize, "Incorrect total number of bytes processed");
627     assertEQ(fd.readOffset, TestSize, "Did not read correct number of bytes");
628     assertEQ(fd.writeOffset, TestSize, "Did not write correct number of bytes");
629     assertEQ(numReads, 16);
630     assertEQ(numWrites, 16);
631 }
632 
633 unittest {
634     import mecca.lib.time;
635     import mecca.reactor;
636     import mecca.reactor.io.fd;
637     import mecca.reactor.subsystems.poller;
638 
639     void testBody() {
640         FD pipeReadFD, pipeWriteFD;
641         createPipe(pipeReadFD, pipeWriteFD);
642 
643         BufferedIO!ReactorFD pipeRead = BufferedIO!ReactorFD( ReactorFD(move(pipeReadFD)), 128, 4096-128);
644         ReactorFD pipeWrite = ReactorFD(move(pipeWriteFD));
645 
646         uint numLinesRead, numLinesFailed;
647 
648         auto timeout = Timeout(300.msecs);
649 
650         enum string[] expectedLines = [ "First line", "Second line", "", "Fourth line",
651             "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" ~
652                     "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
653             "aaaaaaaaaaaa", "" ];
654         void reader() {
655             const(char)[] line;
656             int i;
657             do {
658                 line = pipeRead.readLine('\n', true, timeout);
659                 if( line.length>0 && line[$-1]=='\n' ) {
660                     DEBUG!"Read \"%s\" expected \"%s\""( line[0..$-1], expectedLines[i] );
661                     assertEQ( expectedLines[i], line[0..$-1], ": Read the wrong data" );
662                     ++i;
663                     numLinesRead++;
664                 } else {
665                     DEBUG!"Read \"%s\" expected \"%s\""( line[0..$], expectedLines[i] );
666                     assertEQ( expectedLines[i], line[0..$], ": Read the wrong data" );
667                     ++i;
668                     numLinesFailed++;
669                 }
670             } while(line.length>0);
671         }
672 
673         theReactor.spawnFiber(&reader);
674 
675         void writeData(string data) {
676             pipeWrite.write(data, timeout);
677             poller.poll(); // Otherwise the other fiber won't get scheduled
678             theReactor.yield();
679         }
680 
681         // Write two complete lines and one incomplete one
682         writeData("First line\nSecond line\n\nFour");
683         assertEQ(numLinesRead, 3, "Read lines count incorrect");
684         assertEQ(numLinesFailed, 0, "Failed lines count incorrect");
685 
686         writeData("th lin");
687         assertEQ(numLinesRead, 3, "Read lines count incorrect");
688         assertEQ(numLinesFailed, 0, "Failed lines count incorrect");
689 
690         writeData("e\n");
691         assertEQ(numLinesRead, 4, "Read lines count incorrect");
692         assertEQ(numLinesFailed, 0, "Failed lines count incorrect");
693 
694         writeData("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); // 70 "a"s
695         assertEQ(numLinesRead, 4, "Read lines count incorrect");
696         assertEQ(numLinesFailed, 0, "Failed lines count incorrect");
697 
698         writeData("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); // 70 more "a"s
699         assertEQ(numLinesRead, 4, "Read lines count incorrect");
700         assertEQ(numLinesFailed, 1, "Failed lines count incorrect");
701 
702         destroy(pipeWrite);
703         poller.poll(); // Otherwise the other fiber won't get scheduled
704         theReactor.yield();
705 
706         assertEQ(numLinesRead, 4, "Read lines count incorrect");
707         assertEQ(numLinesFailed, 3, "Failed lines count incorrect");
708     }
709 
710     testWithReactor(&testBody);
711 }