1 /// Reactor aware process management
2 module mecca.reactor.lib.process;
3 
4 // Licensed under the Boost license. Full copyright information in the AUTHORS file
5 
6 import core.stdc.errno;
7 import core.sys.posix.sys.wait;
8 import core.sys.posix.unistd;
9 import std.algorithm : move;
10 import std.conv : emplace;
11 import process = std.process;
12 
13 import mecca.containers.pools;
14 import mecca.lib.exception;
15 import mecca.lib.io;
16 import mecca.lib.time;
17 import mecca.log;
18 import mecca.platform.os : OSSignal;
19 import mecca.reactor;
20 import mecca.reactor.io.fd;
21 import mecca.reactor.io.signals;
22 import mecca.reactor.sync.event;
23 
24 /** Subprocess handler
25  *
26  * Do not create direct instances of this struct. Use `ProcessManager.alloc` instead.
27  */
28 struct Process {
29     /// The three IO streams that can be redirected
30     enum StdIO {
31         StdIn = 0,      /// Child's stdin stream.
32         StdOut,         /// Child's stdout stream.
33         StdErr          /// Child's stderr stream.
34     }
35 private:
36     // Prevent accidental copying
37     @disable this(this);
38 
39     pid_t _pid;
40     Event processDone;
41     // Child side end of standard streams
42     FD[3] childStdIO;
43     int exitStatus;
44     version(assert) {
45         bool managerInitialized;
46     }
47 
48 public:
49     /// Parent side end of standard streams
50     ReactorFD[3] stdIO;
51 
52     /** Redirect a standard IO for the child to an FD
53      *
54      * Params:
55      * io = which of the IO streams to redirect
56      * fd = An fd to be assigned. Ownership is moved to the child
57      */
58     void redirectIO(StdIO io, FD fd) nothrow @safe @nogc {
59         verifyManagerInitialized();
60         ASSERT!"Cannot set redirection on child once it has been run"(pid==0);
61         stdIO[io].close();
62         move(fd, childStdIO[io]);
63     }
64 
65     /** Redirect a standard IO for the child to a pipe
66      *
67      * Redirects the child's IO to a pipe whose other side is accessible by the parent.
68      * After running, the other end of the pipe will be available as `stdIO[io]`
69      *
70      * Params:
71      * io = which of the IO streams to redirect
72      */
73     void redirectIO(StdIO io) @trusted @nogc {
74         verifyManagerInitialized();
75         ASSERT!"Cannot set redirection on child once it has been run"(pid==0);
76 
77         FD parentEnd;
78         FD* writeEnd, readEnd;
79         if( io==StdIO.StdIn ) {
80             writeEnd = &parentEnd;
81             readEnd = &childStdIO[io];
82         } else {
83             writeEnd = &childStdIO[io];
84             readEnd = &parentEnd;
85         }
86 
87         createPipe( *readEnd, *writeEnd );
88         stdIO[io] = ReactorFD( move(parentEnd) );
89     }
90 
91     /** Redirect stderr to stdout
92      *
93      * Make the child's stderr point to the same stream as its stdout
94      */
95     void redirectErrToOut() @safe @nogc {
96         verifyManagerInitialized();
97         ASSERT!"Cannot set redirection on child once it has been run"(pid==0);
98         ASSERT!"Cannot redirect when stdout is not valid"(childStdIO[StdIO.StdOut].isValid);
99 
100         childStdIO[StdIO.StdErr] = childStdIO[StdIO.StdOut].dup();
101         stdIO[StdIO.StdErr].close();
102     }
103 
104     /// Test whether the child process is still running
105     bool isRunning() const nothrow @safe @nogc {
106         return !processDone.isSet;
107     }
108 
109     /** Suspend fiber until the child process finishes
110      *
111      * Returns:
112      * Return the child's return value
113      */
114     int wait(Timeout timeout = Timeout.infinite) @safe @nogc {
115         processDone.wait(timeout);
116 
117         return exitStatus;
118     }
119 
120     /** Run the child with the given arguments.
121      *
122      * This function returns immediately. Use `wait` if you want to wait for the child to finish execution.
123      *
124      * Errors:
125      * On failure to create a child process, this function will throw an `ErrnoException`.
126      * If the exec itself fails (e.g.: trying to run a command that doesn't exist), this function will report success,
127      * but calling `wait` will return a child exit status of 255.
128      */
129     void run(string[] args...) @trusted @nogc {
130         verifyManagerInitialized();
131         import mecca.lib.reflection : as;
132 
133         /*
134            We are @nogc. @nogc states that the function will not trigger a GC before it returns.
135 
136            This function creates a child process. The parent doesn't trigger a GC. The child calls execve, and doesn't
137            return.
138 
139            So we are @nogc.
140          */
141         ASSERT!"run must be called with at least one argument"(args.length>0);
142         _pid = vfork();
143 
144         if( _pid==0 ) {
145             as!"@nogc"({ runChildHelper(args); });
146         }
147 
148         // Cleanups
149         foreach( ref fd; childStdIO ) {
150             fd.close();
151         }
152 
153         as!"@nogc"({ theProcessManager.processes[_pid] = &this; });
154 
155         INFO!"Launched child process %s"(pid);
156     }
157 
158     /** Send the process a signal
159      *
160      * Params:
161      * signal = Signal to send. SIGTERM by default
162      *
163      * Throws:
164      * Throws `ErrnoException` if sending the signal failed
165      */
166     void kill(OSSignal signal = OSSignal.SIGTERM) @trusted @nogc {
167         ASSERT!"Cannot kill process before it was started"(pid!=0);
168 
169         errnoEnforceNGC( .kill( pid, signal )==0, "Failed to send signal to process" );
170     }
171 
172     /// Returns the pid of the child.
173     ///
174     /// Returns 0 if child has not started running
175     @property pid_t pid() const pure nothrow @safe @nogc {
176         return _pid;
177     }
178 
179     // Do not call!!!
180     void _poolElementInit() nothrow @safe @nogc {
181         emplace(&this);
182     }
183 
184 private:
185     // runChildHelper is called in a vfork'd child, any data modifications affect the parent process, so must not affect
186     // tracing indent level/etc, thus @notrace is mandatory!
187     @notrace void runChildHelper(string[] args) nothrow @trusted {
188         try {
189             sigset_t emptyMask;
190             errnoEnforceNGC( sigprocmask( SIG_SETMASK, &emptyMask, null )==0, "sigprocmask failed" );
191 
192             // Perform IO redirection
193             foreach( int ioNum, ref fd ; childStdIO ) {
194                 // Leave IO streams that were not redirected as they are
195                 if( !fd.isValid )
196                     continue;
197 
198                 fd.checkedCall!dup2(ioNum, "Failed to redirect IO");
199             }
200 
201             process.execvp(args[0], args);
202             assert(false, "Must never reach this point");
203         } catch(Exception ex) {
204             ERROR!"Execution of %s failed: %s"( args[0], ex.msg );
205             _exit(255);
206         }
207     }
208 
209     void handleExit(int status) nothrow @safe {
210         exitStatus = status;
211         processDone.set();
212 
213         destroy( theProcessManager.processes[pid] );
214     }
215 
216     void verifyManagerInitialized() pure const nothrow @safe @nogc {
217         version(assert) {
218             DBG_ASSERT!"Process was directly initialized. Use ProcessManager.alloc instead."( managerInitialized );
219         }
220     }
221 }
222 
223 /// Process tracking management
224 struct ProcessManager {
225 private:
226     // Prevent accidental copying
227     @disable this(this);
228 
229     enum OUTPUT_BUFF_SIZE = 1024;
230 
231     SimplePool!Process processPool;
232     Process*[pid_t] processes; // XXX change to a nogc construct
233     void delegate(pid_t pid, int exitStatus) nothrow @system customHandler;
234 
235     /// RAII wrapper for a process
236     static struct ProcessPtr {
237         Process* ptr;
238 
239         @disable this(this);
240 
241         ~this() nothrow @safe @nogc {
242             if( ptr is null )
243                 return;
244 
245             theProcessManager.processes.remove(ptr.pid);
246 
247             theProcessManager.processPool.release(ptr);
248         }
249 
250         alias ptr this;
251     }
252 
253 public:
254     /** initialize the process manager
255      *
256      * You must call this before trying to spawn new processes. Must be called when the reactor is already active.
257      *
258      * Params:
259      * maxProcesses = Maximum number of concurrent processes to be tracked.
260      */
261     void open(size_t maxProcesses) @trusted @nogc {
262         processPool.open(maxProcesses);
263 
264         reactorSignal.registerHandler(OSSignal.SIGCHLD, &sigChildHandler);
265         customHandler = null;
266     }
267 
268     /** Shut down the process manager
269      *
270      * There is no real need to call this. This is mostly useful for unit tests. Must be called while the reactor is
271      * still active.
272      */
273     void close() @trusted @nogc {
274         reactorSignal.unregisterHandler(OSSignal.SIGCHLD);
275         processPool.close();
276     }
277 
278     /** Allocate a new handler for a child process.
279      *
280      * This is the way to allocate a new child process handler.
281      *
282      * Returns a smart object referencing the Process struct. Struct is destructed when the referenece is destructed.
283      * If the reference is destructed, this is effectively the same as detaching from the running process. The process
284      * remains running, but we will no longer be notified when it exits.
285      */
286     ProcessPtr alloc() nothrow @safe @nogc {
287         Process* ptr = processPool.alloc();
288         version(assert) {
289             ptr.managerInitialized = true;
290         }
291 
292         return ProcessPtr(ptr);
293     }
294 
295     alias OutputHandlerDlg = void delegate(Process* child, const(char)[] output) nothrow;
296     /**
297      * Run a command and pass its output to a callback
298      *
299      * Run a command with the supplied command line. Call `outputProcessor` whenever the command outputs anything on
300      * stdout or stderr.
301      *
302      * In addition to those, outputProcessor gets called three more times. The first time is right before the process is
303      * started (with `output` set to empty). This allows outputProcessor to do any additional manipulations (such as
304      * redirecting the input). This can be tested for because `child.pid` is 0.
305      *
306      * The second time is when the output closes. Again, this call will have a `output` set to an empty range.
307      *
308      * The third time is after the process has already exit. This can be tested for because `child.isAlive` will be
309      * false.
310      */
311     void collectCommandOutput( OutputHandlerDlg outputProcessor, string[] cmdline... ) {
312         ASSERT!"cmdline must have at least one argument (command to run)"(cmdline.length > 0);
313         theReactor.spawnFiber!_commandOutputFiber(outputProcessor, cmdline);
314     }
315 
316     /** Register custom SIGCHLD handler
317      *
318      * With this function you can register a custom SIGCHLD handler. This handler will be called only for child
319      * processes that were not started by the ProcessManager, or for child processes that did not exit.
320      *
321      * The handler delegate will be called under a critical section lock and must not sleep.
322      */
323     void registerSigChldHandler(void delegate(pid_t pid, int status) nothrow @system handler) nothrow @safe @nogc {
324         customHandler = handler;
325     }
326 
327     package(mecca.reactor) void sigChildHandler(OSSignal) @system {
328         // Ignore the siginfo, as SIGCHLD notifications might be merged
329 
330         bool handled;
331         int status;
332         pid_t childPid;
333         while( (childPid = waitpid(-1, &status, WNOHANG))>0 ) {
334             handled = true;
335 
336             Process** child = childPid in processes;
337 
338             if( (WIFEXITED(status) || WIFSIGNALED(status)) && child !is null ) {
339                 INFO!"Received child exit notification from %s: 0x%x"(childPid, status);
340                 (*child).handleExit(status);
341             } else {
342                 if( customHandler !is null ) {
343                     customHandler( childPid, status );
344                 } else {
345                     // This is probably because we abandoned the child PID
346                     INFO!"Received child exit notification from abandoned child %s status 0x%x"( childPid, status );
347                 }
348             }
349         }
350 
351         if( !handled ) {
352             // This isn't as serious a condition as the code might suggest. We might have handled that PID in a previous
353             // invocation.
354             DEBUG!"SIGCHLD reported, but wait failed with errno %s"(errno);
355         }
356     }
357 
358     // Fiber function for collectCommandOutput
359     // DMDBUG public because otherwise we can't template on it
360     public static void _commandOutputFiber( OutputHandlerDlg outputProcessor, string[] cmdline ) {
361         auto child = theProcessManager.alloc();
362         child.redirectIO( Process.StdIO.StdOut );
363         child.redirectErrToOut();
364 
365         // First call, before we run anything
366         outputProcessor(child, null);
367 
368         child.run(cmdline);
369 
370         char[OUTPUT_BUFF_SIZE] buffer;
371         ssize_t numRead;
372         do {
373             numRead = child.stdIO[Process.StdIO.StdOut].read(buffer);
374             // Call with what we got. This will be the empty range the last time we call
375             outputProcessor( child, buffer[0..numRead] );
376         } while( numRead>0 );
377 
378         child.wait();
379 
380         outputProcessor( child, null );
381     }
382 }
383 
384 @notrace ref ProcessManager theProcessManager() nothrow @trusted @nogc {
385     return _theProcessManager;
386 }
387 
388 private __gshared ProcessManager _theProcessManager;
389 
390 unittest {
391     import mecca.reactor;
392     enum GRACE_UT_TIMEOUT = 10.seconds;
393 
394     testWithReactor({
395             theProcessManager.open(24);
396             scope(exit) theProcessManager.close();
397 
398             {
399                 // Test detached child handling
400                 auto child2 = theProcessManager.alloc();
401                 child2.run("true");
402             }
403 
404             auto child = theProcessManager.alloc();
405             child.redirectIO(Process.StdIO.StdOut);
406             child.redirectErrToOut();
407 
408             child.run("echo", "-e", "Hello\\r", "world");
409 
410             version (linux)
411                 enum expectedBuffer = "Hello\r world\n";
412             else version (Darwin)
413                 enum expectedBuffer = "-e Hello\\r world\n"; // BSD `echo` doesn't support the `-e` flag
414             else
415                 static assert(false, "Unsupported platform");
416 
417             char[] buffer;
418             ssize_t res;
419             do {
420                 enum readChunk = 128;
421                 size_t oldLen = buffer.length;
422                 buffer.length = oldLen + readChunk;
423                 res = child.stdIO[Process.StdIO.StdOut].read(buffer[oldLen..oldLen+readChunk], Timeout(GRACE_UT_TIMEOUT));
424                 buffer.length = oldLen + res;
425             } while( res>0 );
426 
427             DEBUG!"Child stdout: %s"(buffer);
428             child.wait(Timeout(10.seconds));
429 
430             theReactor.sleep(4.msecs); // Allow time for child2 to actually exit
431 
432             ASSERT!"Child closed stdout but is still running"(!child.isRunning);
433             ASSERT!"Child output is \"%s\", not as expected"( buffer == expectedBuffer, buffer );
434         });
435 }
436 
437 unittest {
438     import mecca.reactor;
439 
440     struct RunContext {
441         Event done;
442         uint howManyWaysMustAManWalkDown;
443         bool killed;
444 
445         void callback( Process* child, const(char)[] buffer ) nothrow {
446             try {
447                 if( child.pid == 0 ) {
448                     // First call, before we started
449                     child.redirectIO(Process.StdIO.StdIn, FD( "/dev/zero", O_RDONLY ));
450 
451                     return;
452                 }
453 
454                 foreach(i, c; buffer) {
455                     ASSERT!"buffer not filled with 0, has %s instead at offset %s"(
456                             c=='\0', c, i+howManyWaysMustAManWalkDown);
457                 }
458 
459                 howManyWaysMustAManWalkDown += buffer.length;
460                 if( !killed && howManyWaysMustAManWalkDown > 8196 ) {
461                     child.kill();
462                     killed = true;
463                 }
464 
465                 if( !child.isRunning )
466                     done.set();
467             } catch(Exception ex) {
468                 ERROR!"Failed: %s"(ex.msg);
469                 assert(false);
470             }
471         }
472     }
473 
474     testWithReactor({
475             theProcessManager.open(24);
476             scope(exit) theProcessManager.close();
477 
478             import mecca.reactor.io.signals;
479 
480             void termHandler(OSSignal) {
481                 // Nothing. We don't expect this to get called
482             }
483 
484             // Register a TERM handler so that the signal is masked in the parent
485             reactorSignal.registerHandler(OSSignal.SIGTERM, &termHandler);
486 
487             RunContext ctx;
488 
489             theProcessManager.collectCommandOutput( &ctx.callback, "cat" );
490 
491             ctx.done.wait();
492         });
493 }